Image Flow
The image flow facilitates the transfer of image information like image urls, image orders from ERP systems to Omnitron. Each execution fetches new/modified image information from the given API endpoint, these information are matched using entered filename template rules and downloadable images on Omnitron are updated accordingly for matched products.
Image Flow Types
Inbound Flow: Indicates that the flow will read data from the ERP system. A cron schedule is set up to ensure it runs, for example, every 5 minutes.
Outbound Flow: Signifies that product data will be posted to the flow from the ERP system, skipping the step of reading data from the ERP system.
Flow Steps
Login Step: Handles logging into Omnitron and (if configured) the ERP system. If an error occurs during this step, the details are logged.
Read Data From ERP Step: For inbound flows, image queries are made from the ERP system. Outbound flows proceed directly to the next step.
Script Step (Optional): Transformation operations on the data are performed using Python, if required.
Mapping Step (Optional): Transformation operations on the data are performed using the Jolt Transform library. Further details can be found at the Jolt Transform address.
Write Data to Omnitron Step: The incoming data is divided into rows and written to Omnitron. Logs are created for each row. In case of an error, details of the error are logged.
Detailed Flow Designer Settings
Configuration Card
ERP Query Settings: Configures the key values to be used for single and date-based queries from ERP. Ex:
{
"sku": {
"key": "sku",
"type": "string"
},
"modified_date__gt": {
"key": "modified_date__gt",
"type": "datetime"
},
"custom": {
"key": "status",
"type": "string"
}
}
Domain URL: Omnitron's domain URL.
Timezone Settings: Timezone used for date-based queries from ERP.
Filename Template: It defines that images should assign the images based on what. For example, if the images will be assigned based on SKU, value should be {sku}, if it is Base code, value should be {base_code}
Enabled: Fetch missing setting, when enabled, allows fetching products from ERP that have been newly productized but lack image information. The 'fetch missing' functionality is a cron job operating every 4 hours for all enabled flows. It queries products missing images and triggers CSVs to flow for the identified products.
Omnitron Identifier Path for ERP: Path for reading products from Omnitron.
Fetch Images After Productization Date Limit: Defines the number of days (fixed to 2) during which newly approved SKUs on Omnitron are considered for automatic fetching by the Fetch Missing Task. This setting is informational only and cannot be modified.
Read Data From ERP Card
Add Extra Headers as Dict Format: During the image query from ERP, additional headers to be sent.
Modified Date Key: The value of the date parameter thrown to receive the changes made since the last successful request. (e.g. modified_date__gt, modified_date, modified )
Extra Params: Additional parameters used when ERP request is made, it must be a valid dict if a GET request is made which will be used to send query parameters in the URL or it can be used on a POST request in which the extra params value will be used to be sent in request body.
Endpoint URL: URL for reading data from the ERP system.
Dynamic URL Usage:
${http.query.string:isEmpty():not():
ifElse(
${http.query.param.sku:isEmpty():not()
:ifElse(
${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Barcodes', 'Value': '{SKU}'}]}")
:replace("{SKU}", ${http.query.param.sku})
:replace("{TOKEN}",${token.erp})
},
${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Date', 'Value': '{MODIFIED_DATE}'}]}")
:replace("{MODIFIED_DATE}", ${http.query.param.modified_date__gt:replaceAll("(\.\d*)?Z?$","")})
:replace("{TOKEN}",${token.erp})
}
)},
${literal("http://127.0.0.1:12345/(S({TOKEN}))/service/run?{'name': 'Akinon', 'Parameters': [{'Name': 'Date', 'Value': '{MODIFIED_DATE}'}]}")
:replace("{MODIFIED_DATE}", ${last.request:replaceAll("(\.\d*)?Z?$","")})
:replace("{TOKEN}",${token.erp})
}
)
}
HTTP Method: GET or POST.
Pagination: Pagination in Integrator works by creating a new execution for each page, using the same initial parameters and continuing with the configured pagination method. The process continues until the response no longer contains valid data — either the response body is empty, or the status code is not in the 2xx range.
This approach causes Integrator to make one extra request per task to ensure all possible data has been retrieved.
A new task will not begin until the currently running task has fetched all available pages.
The "last request date" parameter — typically used as modified_date__gt — is only updated after Integrator has successfully fetched all pages and the final response returns empty data. At that point, a new scheduled task can start using an updated "last request date" value, which corresponds to the timestamp just before the first request of the previous successful task.
Offset pagination is a method where a fixed number of items are retrieved from a data source in each request, starting from a specified offset. This offset indicates the position from which to begin fetching data. This pagination type is useful when pages are constructed by skipping an amount of data, for example;
First page:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000&limit=50&skip=0
Second page:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000&limit=50&skip=50
Third page:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000&limit=50&skip=100
Seek pagination is a method where the app fetches pages sequentially by incrementing a numeric page parameter with each request. Each page contains a fixed number of items. This is often simpler than offset pagination, as it directly references a page number rather than calculating offsets. This pagination type is useful when the API expects explicit page numbers, for example:
First page:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000&page=1&limit=50
Second page:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000&page=2&limit=50
Third page:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000&page=3&limit=50

Next field pagination is a method where the API response itself includes a URL or token pointing to the next page of data. The app follows this URL to retrieve the next set of results, continuing until there are no more pages. This is useful when the server controls how to continue pagination, for example:
{
"results": [ /* items */ ],
"next": "/api/data/?cursor=abc123"
}
The app will automatically use the next field to make the subsequent request:
First request:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000
Follow-up request:
/api/data/?modified_date__gt=2025-07-22T10:30:27.000000&cursor=abc123
And so on, until next is null or not present.

Script Card (Optional)
This is a base script. The process step within this script can be updated to enable the use of the script step. Script should be used in case Mapping can not be used to correctly transform response data to target structure. In the case it is not needed you can leave script input empty.
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
def get_attribute(flow_file, attr_name):
all_var = flow_file.getAttribute("allVar")
if all_var:
all_attributes = json.loads(all_var)
return all_attributes.get(attr_name, None)
return flow_file.getAttribute(attr_name)
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
self.omnitronToken = get_attribute(flowFile, "token.omnitron")
self.erpToken = get_attribute(flowFile, "token.erp")
def process(self, inputStream, outputStream):
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input_obj = json.loads(input_text)
# Transform content
output_obj = self.transform(input_obj)
# Write output content
outputStream.write(bytearray(json.dumps(output_obj, indent=4).encode('utf-8')))
def transform(self, output_obj):
# Transform content
return output_obj
flowFile = session.get()
if flowFile != None:
try:
flowFile = session.write(flowFile, TransformCallback(flowFile))
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)
except:
var = traceback.format_exc()
session.putAttribute(flowFile, 'log.error.script_error', str(var))
session.transfer(flowFile, REL_FAILURE)
session.commit()
Script Testing: The response from ERP is placed in the input field, and the result of the script is viewed in the result field.
Mapping Card (Optional)
Expected Output:
[ {
"product_sku" : "test_template",
"images" : [ {
"url" : "https://test.project.com.tr/test_template-1.jpg",
"order" : 1
}, {
"url" : "https://test.project.com.tr/test_template-2.jpg",
"order" : 2
}, {
"url" : "https://test.project.com.tr/test_template-3.jpg",
"order" : 3
} ]
} ]
Mapping Testing: The response from the script is placed in the input field, and the result of the mapping is viewed in the result field.
Write Data to Omnitron
Example post request from Integrator to Omnitron.
POST Path: /api/v1/downloadable-image/
{
"product_sku" : "1234",
"images" : [ {
"url" : "https://test.project.com.tr/test_template-1.jpg",
"order" : 1
}, {
"url" : "https://test.project.com.tr/test_template-2.jpg",
"order" : 2
}, {
"url" : "https://test.project.com.tr/test_template-3.jpg",
"order" : 3
} ]
}
Outbound Flow
In outbound flows, triggers will be configured to send a POST request to the URL specified in the "Outbound Request URL" setting under the Configuration card. The content of the POST request will include JSON data containing details such as SKU and Images for each product.
Expected Payload:
[{
"product_sku": "1234",
"images": [
{
"url": "https://test.project.com.tr/test_template-1.jpg",
"order": 1
},
{
"url": "https://test.project.com.tr/test_template-2.jpg",
"order": 2
},
{
"url": "https://test.project.com.tr/test_template-3.jpg",
"order": 3
}
]
},
{
"product_sku": "12345",
"images": [
{
"url": "https://test.project.com.tr/test_template2-1.jpg",
"order": 1
},
{
"url": "https://test.project.com.tr/test_template2-2.jpg",
"order": 2
},
{
"url": "https://test.project.com.tr/test_template2-3.jpg",
"order": 3
}
]
},
{
"product_sku": "123456",
"images": [
{
"url": "https://test.project.com.tr/test_template3-1.jpg",
"order": 1
},
{
"url": "https://test.project.com.tr/test_template3-2.jpg",
"order": 2
},
{
"url": "https://test.project.com.tr/test_template3-3.jpg",
"order": 3
}
]
}]
The authorization token will be obtained by logging in with the Integrator user and must be included in the request headers for authentication purposes.
Last updated
Was this helpful?