Stock Flow

The stock flow facilitates the transfer of product stock information between ERP systems and Omnitron. This process ensures that inventory levels are accurately synchronized between these systems, providing up-to-date stock availability data. The flow can operate in two ways: as inbound or outbound. Inbound stock flow means that the flow triggers itself at specific intervals set by the user,such as once every minute, every 1 hour, every day at midnight, every 5th day of a month etc. to read data from the ERP system and write it to Omnitron, ensuring Omnitron has the latest inventory data. Outbound stock flow, on the other hand, involves data being pushed directly to the flow from an external source and then written to Omnitron. This dual capability ensures seamless communication and accurate stock management across the entire system.

Stock Flow Types​

  • Inbound Flow: Indicates that the flow will read data from the ERP system. A cron schedule is set up to ensure it runs, for example, every 5 minutes.

  • Outbound Flow: Signifies that stock data will be posted to the flow from the ERP system, skipping the step of reading data from the ERP system, with outbound flows the data is sent to outbound urls found in flow configuration using HTTP POST methods.

Flow Steps​

  • Login Step: Handles logging into Omnitron and (if configured) the ERP system. If an error occurs during this step, the details are logged.

  • Read Data From ERP Step: For inbound flows, stock queries are made from the ERP system. Outbound flows proceed directly to the next step.

  • Script Step (Optional): Transformation operations on the data are performed using Python, if required.

  • Mapping Step (Optional): Transformation operations on the data are performed using the Jolt Transform library. Further details can be found at the Jolt Transform website.

  • Write Data to Omnitron Step: The incoming data is divided into rows and written to Omnitron. Logs are created for each row. In case of an error, details of the error are logged (e.g., The relevant product is not found in Omnitron). Successful operations are also logged.

Detailed Flow Designer Settings​

Configuration Card​

  • Trigger Settings: Configures the key values to be used for single and date-based queries from ERP. sku and modified_date__gt are created by default and are required values. These settings are used in the trigger page to use specific parameters while creating the trigger url. modified_date__gt value is also used while creating automatic inbound request urls and sku value is used on “Fetch Missing” tasks for querying missing skus on ERP.

Adding a new query parameter:

Using newly added parameters:

Effects of the custom parameters:

  • Omnitron Stok List ID: Information about the stock list ID to be updated in Omnitron, this information should be fetched from Omnitron, go to Products and Catalogs, Stock List then open the stock list page flow is supposed to update last digits on the URL of the page will be stock list id, ex: https://demo.akinon.net/en/en-us/products-and-categories/stocks/stock-detail/34 in this example stock list id is 34. This value is also used with the “Fetch Missing” task to find missing skus in the stock list.

  • Domain Url: Omnitron's domain URL, example: https://demo.omnitron.akinon.net/.

  • Timezone Settings: Timezone used for date-based queries from ERP, this value is used with inbound task requests to modify modified_date__gt value for a specific timezone. The date time format is following: yyyy-MM-dd'T'HH:mm:ss.SSSSSS, ex: 2024-12-31T13:30:59.000001

  • Enabled: Fetch missing setting, when enabled, allows fetching products from ERP that have been newly productized but lack stock information. The 'fetch missing' functionality is a cron job operating every 4 hours for all enabled flows. It queries omnitron for products with missing stock values for the “Stock List” entered and triggers the flow with CSVs for the identified products. Since “outbound” flows do not have an endpoint to make a SKU query, this feature can not be used with outbound flows.

  • Omnitron Identifier Path for ERP: Path for reading products from Omnitron.

  • Fetch Stocks After Productization Date Limit: Determines how many days within which productized products will be queried from Omnitron.

Read Data From ERP Card​

  • Add Extra Headers as Dict Format: During the stock query from ERP, additional headers to be sent.

  • Stock List Key: Key used as a parameter during queries from ERP.

  • Extra Params: Additional parameters used when ERP request is made, it must be a valid dict if a GET request is made which will be used to send query parameters in the url or it can be used on a POST request in which the extra params value will be used to be sent in request body.

  • Endpoint URL: URL for reading data from the ERP system.

Dynamic URL Usage:​

It is possible to create a URL dynamically changing during runtime using Nifi Expression Language, it is possible to use attributes like http.query.string, http.query.param.sku, http.query.param.modified_date__gt and create different urls based on conditions.

For more information about Nifi Expression Language please check this page.

${http.query.string:isEmpty():not():
    ifElse(
        ${http.query.param.sku:isEmpty():not()
            :ifElse(
                ${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Barcodes', 'Value': '{SKU}'}]}")
                    :replace("{SKU}", ${http.query.param.sku})                             
                    :replace("{TOKEN}",${token.erp})   
                },
                ${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Date', 'Value': '{MODIFIED_DATE}'}]}")                             
                    :replace("{MODIFIED_DATE}", ${http.query.param.modified_date__gt:replaceAll("(\.\d*)?Z?$","")})
                    :replace("{TOKEN}",${token.erp})
                }            
            )},
        ${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Date', 'Value': '{MODIFIED_DATE}'}]}")
        :replace("{MODIFIED_DATE}", ${last.request:replaceAll("(\.\d*)?Z?$","")})
        :replace("{TOKEN}",${token.erp})
        }         
    )
}

HTTP Method: GET or POST

Pagination:

  • Offset pagination is a method where a fixed number of items are retrieved from a data source in each request, starting from a specified offset. This offset indicates the position from which to begin fetching data.

  • Seek pagination relies on a unique identifier or a specific value to retrieve subsequent sets of data. Instead of using offsets, it uses a marker or token indicating where the previous set of data ended.

  • Next field pagination includes a "next" field or token in each server response that points to the next page of data.

Script Card (Optional)​

This is a base script. The process step within this script can be updated to enable the use of the script step.

import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback

class TransformCallback(StreamCallback):
   def __init__(self, flowFile):
       self.flowFile = flowFile
       self.omnitron_token = flowFile.getAttribute("token.omnitron")
       self.erp_token = flowFile.getAttribute("token.erp")


   def process(self, inputStream, outputStream):
       input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
       input_obj = json.loads(input_text)

       # Transform content
       output_obj = self.transform(input_obj)

       # Write output content
       outputStream.write(bytearray(json.dumps(output_obj, indent=4).encode('utf-8')))
      
   def transform(self, output_obj):
       # Transform content
       return output_obj

flowFile = session.get()
if flowFile != None:
   try:
       flowFile = session.write(flowFile, TransformCallback(flowFile))
       # Finish by transferring the FlowFile to an output relationship
       session.transfer(flowFile, REL_SUCCESS)
   except:
       var = traceback.format_exc()
       session.putAttribute(flowFile, 'log.error.script_error', str(var))
       session.transfer(flowFile, REL_FAILURE)
   session.commit()

Script Testing: The response from ERP is placed in the input field, and the result of the script is viewed in the result field.

Mapping Card (Optional)​

Details can be found at the Jolt Transform website.

Example Mapping:

[
    {
        "operation": "shift",
        "spec": {
            "*": {
                "sku": "[&1].product_sku",
                "stock": "[&1].stock",
                "stock_list": "[&1].stock_list",
                "unit_type": "[&1].unit_type",
                "updated_date": "[&1].updated_date"
            }
        }
    }
]

Expected Output:

[ {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12345",
  "stock" : 123
}, {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12346",
  "stock" : 0
}, {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12347",
  "stock" : 9
}]

Mapping Testing: The response from the script is placed in the input field, and the result of the mapping is viewed in the result field.

POST Write Data to Omnitron​

Path: /api/i1/product_stock/bulk_upsert/

Example POST request from Integrator to Omnitron:

[ {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "1234",
  "stock" : 0,
  "batch_id" : "123a-123a-123a-123a-123123a"
}, {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12345",
  "stock" : 1,
  "batch_id" : "123a-123a-123a-123a-123123a"
}, {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12344",
  "stock" : 26,
  "batch_id" : "123a-123a-123a-123a-123123a"
}]

Outbound Flow​

In outbound flows, triggers will be configured to send a POST request to the URL specified in the "Outbound Request URL" setting under the Configuration card. The content of the POST request will include JSON data containing details such as SKU, stock quantity, stock list ID, and unit type for each product.

There can not be more than 5 active executions in a flow at the same time, if flow has 5 logs “processing” new outbound executions will not be accepted. Our suggested batch size is 5000 lines per execution.

Expected Payload:

[ {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12345",
  "stock" : 123
}, {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12346",
  "stock" : 0
}, {
  "stock_list" : "1",
  "unit_type" : "qty",
  "product_sku" : "12347",
  "stock" : 9
}]

The authorization token will be obtained by logging in with the Integrator user and must be included in the request headers for authentication purposes.

Last updated

Was this helpful?