Whippy Stock Flow

The Whippy Stock flow facilitates the transfer of location-based product stock information between ERP systems and Whippy. This process ensures that stock levels are accurately synchronized between these systems, providing up-to-date data. The flow can operate in two ways: inbound or outbound.

The inbound Whippy Stock flow triggers itself at specific intervals set by the user, such as once every minute, every hour, daily at midnight, every 5th day of the month, etc. It reads data from the ERP system and writes it to Whippy, ensuring that Whippy always has the latest stock information.

The outbound Whippy Stock flow, on the other hand, involves data being pushed directly to the flow from an external source and then written to Whippy.

This dual capability ensures seamless communication and accurate stock management across the entire system.

Flow Types​

  • Inbound Flow: Indicates that the flow will read data from the ERP system. A cron schedule is set up to ensure it runs, for example, every 5 minutes.

  • Outbound Flow: Signifies that stock data will be posted to the flow from the ERP system, skipping the step of reading data from the ERP system, with outbound flows the data is sent to outbound URLs found in flow configuration using HTTP POST methods, more details can be found at Outbound section.

Flow Steps​

  • Login Step: Handles logging into Whippy and (if configured) the ERP system. If an error occurs during this step, the details are logged.

  • Read Data From ERP Step: For inbound flows, stock queries are made from the ERP system. Outbound flows proceed directly to the next step.

  • Script Step (Optional): Transformation operations on the data are performed using Python, if required.

  • Mapping Step (Optional): Transformation operations on the data are performed using the Jolt Transform library. Further details can be found at the JoltTransform website.

  • Write Data to Omnitron Step: The incoming data is divided into rows and written to Whippy. Logs are created for each row. In case of an error, details of the error are logged (e.g., the relevant product is not found in Omnitron). Successful operations are also logged.

Detailed Flow Designer Settings​

Configuration Card​

  • Trigger Settings: Configures the key values to be used for single and date-based queries from ERP. sku and modified_date__gt are created by default and are required values. These settings are used in the trigger page to use specific parameters while creating the trigger URL. modified_date__gt value is also used while creating automatic inbound request URLs and sku value is used on “Fetch Missing” tasks for querying missing SKUs on ERP.

Adding a New Query Parameter:

Using Newly Added Parameters:

Read Data From ERP Card​

  • Add Extra Headers as Dict Format: During the stock query from ERP, additional headers to be sent.

  • Extra Params: Additional parameters used when ERP request is made, it must be a valid dict if a GET request is made which will be used to send query parameters in the URL or it can be used on a POST request in which the extra params value will be used to be sent in request body.

  • Endpoint URL: URL for reading data from the ERP system.

Dynamic URL Usage:​

Dynamic URLs are used when the url structure, path changes based on different conditions or if the URL doesn’t fit a standard REST API structure. In the example below, the URL changes if the request is going to be a “date” based query or “sku” based query and at the same time the API requires the ERP token to be in the URL itself. Using Nifi Expression language it is possible to change the request URL in the run time based on conditions and expressions written in the URL field.

${http.query.string:isEmpty():not():
    ifElse(
        ${http.query.param.sku:isEmpty():not()
            :ifElse(
                ${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Barcodes', 'Value': '{SKU}'}]}")
                    :replace("{SKU}", ${http.query.param.sku})                             
                    :replace("{TOKEN}",${token.erp})   
                },
                ${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Date', 'Value': '{MODIFIED_DATE}'}]}")                             
                    :replace("{MODIFIED_DATE}", ${http.query.param.modified_date__gt:replaceAll("(\.\d*)?Z?$","")})
                    :replace("{TOKEN}",${token.erp})
                }            
            )},
        ${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Date', 'Value': '{MODIFIED_DATE}'}]}")
        :replace("{MODIFIED_DATE}", ${last.request:replaceAll("(\.\d*)?Z?$","")})
        :replace("{TOKEN}",${token.erp})
        }         
    )
}

HTTP Method: GET or POST

Pagination: Pagination in Integrator works by creating a new execution for each page, using the same initial parameters and continuing with the configured pagination method. The process continues until the response no longer contains valid data — either the response body is empty, or the status code is not in the 2xx range.

This approach causes Integrator to make one extra request per task to ensure all possible data has been retrieved.

A new task will not begin until the currently running task has fetched all available pages.

The "last request date" parameter — typically used as modified_date__gt — is only updated after Integrator has successfully fetched all pages and the final response returns empty data. At that point, a new scheduled task can start using an updated "last request date" value, which corresponds to the timestamp just before the first request of the previous successful task.

"Status codes to be assumed as empty page for pagination" setting can be configured to make Integrator consider a certain status code as "Empty" response resulting in an overall successful execution. This is useful if API endpoint used responds with a non 2xx http status code when a empty page is queried.

  • Offset pagination is a method where a fixed number of items are retrieved from a data source in each request, starting from a specified offset. This offset indicates the position from which to begin fetching data. This pagination type is useful when pages are constructed by skipping an amount of data, for example;

    • First page: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000&limit=50&skip=0

    • Second page: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000&limit=50&skip=50

    • Third page: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000&limit=50&skip=100

  • Seek pagination is a method where the app fetches pages sequentially by incrementing a numeric page parameter with each request. Each page contains a fixed number of items. This is often simpler than offset pagination, as it directly references a page number rather than calculating offsets. This pagination type is useful when the API expects explicit page numbers, for example:

    • First page: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000&page=1&limit=50

    • Second page: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000&page=2&limit=50

    • Third page: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000&page=3&limit=50

  • Next field pagination is a method where the API response itself includes a URL or token pointing to the next page of data. The app follows this URL to retrieve the next set of results, continuing until there are no more pages. This is useful when the server controls how to continue pagination, for example:

    {
        "results": [ /* items */ ],
        "next": "/api/data/?cursor=abc123"
    }

The app will automatically use the next field to make the subsequent request:

  • First request: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000

  • Follow-up request: /api/data/?modified_date__gt=2025-07-22T10:30:27.000000&cursor=abc123

  • And so on, until next is null or not present.

Script Card (Optional)​

This is a basic script, and the process steps within it can be updated to activate the use of each script step.

Scripts can read incoming data from the inputStream, where the variable input_text contains a JSON string. This string should be parsed using Python's json library. Any outgoing data is written using the outputStream.write() method after converting the relevant Python object back into a JSON string.

Additionally, the script allows for the use of attributes, which provide supplementary information tied to the current execution process. These attributes can be freely accessed or modified throughout the script. For example, the get_attribute() function is used to read attribute values, while the session.putAttribute() method is used to write new string attribute values. Each attribute consists of a key-value pair, where the key uniquely identifies the attribute, and the value can be referenced in subsequent steps.

Attributes with keys starting with the prefix log. will be automatically logged at the end of the execution if a log file is generated, ensuring that important information is captured and available for later review.

Example Script:

import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback

def get_attribute(flow_file, attr_name):
   all_var = flow_file.getAttribute("allVar")
   if all_var:
       all_attributes = json.loads(all_var)
       return all_attributes.get(attr_name, None)
   return flow_file.getAttribute(attr_name)

class TransformCallback(StreamCallback):
   def __init__(self, flowFile):
       self.flowFile = flowFile
       self.omnitronToken = get_attribute(flowFile, "token.omnitron")
       self.erpToken = get_attribute(flowFile, "token.erp")


   def process(self, inputStream, outputStream):
       input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
       input_obj = json.loads(input_text)

       # Transform content
       output_obj = self.transform(input_obj)

       # Write output content
       outputStream.write(bytearray(json.dumps(output_obj, indent=4).encode('utf-8')))
      
   def transform(self, output_obj):
       # Transform content
       return output_obj

flowFile = session.get()
if flowFile != None:
   try:
       flowFile = session.write(flowFile, TransformCallback(flowFile))
       # Finish by transferring the FlowFile to an output relationship
       session.transfer(flowFile, REL_SUCCESS)
   except:
       var = traceback.format_exc()
       session.putAttribute(flowFile, 'log.error.script_error', str(var))
       session.transfer(flowFile, REL_FAILURE)
   session.commit()

Script Testing:

The response from ERP is placed in the input field, and the result of the script is viewed in the result field.

Mapping Card (Optional)​

Details can be found at the Jolt Transform website. Because after this step the input data will be separated to different order rows, the flow expects the following output to be an array of order objects.

Example Input:

[
  {
    "operation": "shift",
    "spec": {
    "*": {
        "sku": "[&1].sku",
        "stock": "[&1].stock",
        "location": "[&1].location",
        "provider": "[&1].provider"
    }
    }
  }
]

Expected Output:

[ {
  "sku" : "example0",
  "stock" : 21,
  "location" : "loc1",
  "provider" : "erp"
}, {
  "sku" : "example0",
  "stock" : 3,
  "location" : "loc2",
  "provider" : "erp"
}, {
  "sku" : "example1",
  "stock" : 33,
  "location" : "loc1",
  "provider" : "erp"
}]

Mapping Testing: The response from the script is placed in the input field, and the result of the mapping is viewed in the result field.

POST Write Data to Omnitron​

Path: /api/v1/stocks/insert_or_update_stock/

Example POST Request From Integrator to Omnitron:

[ {
  "sku" : "example0",
  "stock" : 21,
  "location" : "loc1",
  "provider" : "erp"
}, {
  "sku" : "example0",
  "stock" : 3,
  "location" : "loc2",
  "provider" : "erp"
}, {
  "sku" : "example1",
  "stock" : 33,
  "location" : "loc1",
  "provider" : "erp"
}]

Outbound Flow​

In outbound flows, triggers will be configured to send a POST request to the URL specified in the "Outbound Request URL" setting under the Configuration card. The content of the POST request will include JSON data containing details such as SKU, stock, location id and provider info for each product.

Expected Payload:

It is still possible to use Script and Mapping steps to transform data according to expected payload but at the end of those steps the expected payload structure is the following:

[ {
  "sku" : "example0",
  "stock" : 21,
  "location" : "loc1",
  "provider" : "erp"
}, {
  "sku" : "example0",
  "stock" : 3,
  "location" : "loc2",
  "provider" : "erp"
}, {
  "sku" : "example1",
  "stock" : 33,
  "location" : "loc1",
  "provider" : "erp"
}]

The authorization token will be obtained by logging in with the Integrator user and must be included in the request headers for authentication purposes.

Last updated

Was this helpful?