Scripting in Flows

Every flow within Integrator includes at least one scripting area that enables input data transformation, preparation for subsequent steps, additional logical operations, and dynamic flow settings adjustments (e.g., modifying the request URL at runtime). It also facilitates extra requests if needed. Scripts in Entegrator are allotted a maximum of 300 seconds for execution; exceeding this time limit results in a failure. This document provides detailed examples and explanations of Integrator's scripting capabilities.

Integrator scripting is powered by Python, providing access to execution data via special objects like session and flowFile. Each script follows a basic structure with specific classes, methods, and imports, as shown in the base script template in Example 1.

Script Concepts​

TransformCallback Class​

A required class structure, extended from StreamCallback. This class provides the environment necessary to read flowFile content, attributes and eventually write/change the content.

  • __init__ method: Runs on initialization of TransformCallback object. No required parameters are necessary, but by convention, the object is initialized with the flowFile object, as it is used to read and write attributes when necessary. It is possible to add or remove other parameters.

  • process(self, inputStream, outputStream) method: This required method reads and writes data within its scope. The method reads input text from inputStream using IOUtils and writes modified or new content to outputStream via the write method. Although other methods or functions can be called within this method’s scope, no additional methods are required within the TransformCallback class.

Session Object​

The Session object is provided globally, allowing it to be used without importing. It provides methods essential for orchestrating the current script session/execution.

  • get(): Returns a flowFile object for the current execution. As scripts read or update the flowFile content, this method typically serves as the script’s entry point.

  • session.write(flowFile, TransformCallback): Takes flowFile and TransformCallback objects, prepares the flowFile for read/write operations, runs the process method in TransformCallback, and returns the updated flowFile.

  • transfer(flowFile,<RESULT STATUS>): Marks the current flowFile as successful using REL_SUCCESS or failed using REL_FAILURE. Each flowFile must be transferred to one of these statuses to complete execution.

  • Please note that REL_SUCCESS and REL_FAILURE values are provided in global context automatically and are not imported or generated. These values are the only accepted values of the transfer method.

  • commit(): Saves changes to the flowFile object and marks the final step of execution, disallowing further modifications.

  • putAttribute(flowFile, <attribute name>, <attribute value>): Creates or modifies attributes in the current flowFile. Both the “attribute name” and “attribute value” must be strings; otherwise, an exception will occur, ending the execution with failure.

  • putAttributes(flowFile, <attribute dict>): Writes multiple attribute values to the flowFile. Accepts a dictionary of string key-value pairs.

Integrator Attributes​

Attributes in flowFiles store runtime data about the current execution. Integrator-provided attributes vary based on the flow type and are used to save information for later steps. Attribute values set in one step are accessible in subsequent steps, allowing for the retention of relevant execution data. Attributes prefixed with "log." can log information about successful or failed executions.

Below is a list of some attributes provided by the Integrator during execution and accessible during script steps in various flows. Most of these values depend on the flow settings configured by the user.

  • log.script.error attribute: A recommended logging attribute for scripts across all flow types.

  • force_step attribute: Used in GenericProxy flows. Possible values are “200response” or “login.” These values should be set during script execution. When the “200response” value is used upon successful execution of a Preprocessing Script, the flow will halt further execution, avoid sending a request, skip the Postprocessing Script, and return an HTTP 200 response. When the “login” value is used upon the failure of any script step in GenericProxy, the flow will retry its Login step and rerun the script once.

  • token.omnitron attribute: The token used by the Integrator to access Omnitron. This token can be used in headers for additional Omnitron requests during script steps. This attribute is not present in GenericProxy flows. Example: Token <token>.

  • token.erp attribute: The token used to access ERP systems, provided only if a Login flow is specified in the Flow Settings of the current flow. Example: Bearer <token>.

  • omnitron.api.domain attribute: Contains the Omnitron URL for the current flow. This value is present only in flows where an “Omnitron API URL” is provided in the settings.

  • omnitron.param.stock.id attribute: Contains the stock list ID used for ERP requests in stock flows.

  • omnitron.param.price.id attribute: Contains the price list ID used for ERP requests in price flows.

  • http.query.string attribute: This attribute is available only in executions where a user has triggered the flow manually from the Integrator UI. It will contain values in query string format as provided when triggering the flow. Examples: sku=foobar&custom1=1 or modified_date__gt=2024-10-01T00:00:00.000Z&foo=1.

  • language attribute: Available for Product flows, reflecting the language setting of the flow. It will contain one of the language values provided in the settings, indicating the language for which the requested product data is intended.

  • erp.api.url attribute: Available in “Order” and “Order Cancel” flows, containing the URL specified in the flow settings for writing order/cancel data to the ERP. This attribute can be overwritten within the Script steps of the relevant flows to change the request URL at runtime and make a request to an alternate URL if needed.

Python Libraries​

Integrator provides access to additional libraries, beyond standard Python libraries, for use during Script steps.

Requests Library​

A widely-used HTTP library for Python, commonly used to make extra HTTP requests within scripts. For more details, see the documentation at requests.readthedocs.io.

An example of using the requests library in Integrator can be found in Example 4. Since Integrator scripts have a maximum runtime of 300 seconds, it is recommended to include a “timeout” parameter in requests to ensure efficient execution.

When using the requests library, Integrator runs scripts on specialized “Script Servers”. Within this environment, attributes added using the session’s putAttribute method cannot exceed 500 characters. If longer attributes are needed, you can write them in the output content under the “attributes” key. Integrator will handle these attributes accordingly and remove them from the flow data after adding them to the flowFile attributes. For detailed usage, refer to Example 5.

Some APIs, such as the ones in Omnitron, require the header “Accept-Encoding”: “deflate” to function correctly.

Xmltodict Library​

Script inputs and outputs do not have to be in JSON format. For ERP services that communicate using XML, script inputs and outputs should be prepared accordingly. The Xmltodict library allows conversion between dictionary data and XML format within scripts. For more information, refer to xmltodict on GitHub.

JSON Library​

A standard Python library that provides methods to convert JSON strings into Python dictionaries or list objects, and vice versa. For more information, see the Python documentation.

Examples​

Examples 1–5 demonstrate different use cases, including reading attributes, transforming data, accessing and modifying attributes, making extra requests, and handling long attributes within Script Servers. Each example illustrates the recommended practices for scripting within Integrator flows, ensuring efficient and successful execution.

Example 1​

import java.io
import traceback
from org.apache.nifi.processor.io import StreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets


# A sure way to read attributes from flowFile
def get_attributes(flow_file):
    all_attributes = flow_file.getAttributes()
    all_var = flow_file.getAttribute("allVar")
    if all_var:
            return json.loads(all_var)
    return all_attributes


class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile

    def process(self, inputStream, outputStream):
        # Read input content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        obj = json.loads(text)

        # process/transform obj
        output_obj = self.transform(obj)

        # Write output content
        outputStream.write(bytearray(json.dumps(output_obj, indent=2).encode("utf-8")))


# Session object is used for getting the "flowFile" object which contains the execution data for the object
flowFile = session.get()
if flowFile is not None:
    try:
        # session.write method takes flowFile and "TransformCallback" objects preparing flowFile for reading and writing then runs "process" method of TransformCallback object, returns updated flowFile object
        flowFile = session.write(flowFile, TransformCallback(flowFile))
        # session.transfer method marks flowFile as successful allowing the execution to continue and process the output data
        session.transfer(flowFile, REL_SUCCESS)
    except:
        # Incase of an exception, get traceback info for logging
        var = traceback.format_exc()
        # Write exception message on flowFile as an attribute, attributes staring with "log." are printed on final log files created by Entegrator
        session.putAttribute(flowFile, "log.script.error", var)
        # Mark the flowFile as failure, finishing the execution after log files are created by Entegrator
        session.transfer(flowFile, REL_FAILURE)
    # Commit the changes on flowFile made during script
session.commit()

Example 2​

A simpler script that reads input and applies transformations.

import json
import traceback
from org.apache.nifi.processor.io import StreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets


class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile
        self.attributes = get_attributes(flowFile)

    def process(self, inputStream, outputStream):
        #Read input content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        obj = json.loads(text)

        # process/transform obj 
        output_obj = self.transform(obj)            

        #Write output content
        outputStream.write(bytearray(json.dumps(output_obj, indent=2).encode("utf-8")))

    def transform(obj):
        # You should transform and process your data here based on your input data schema and desired output data schema
        return output_obj


flowFile = session.get()
if flowFile is not None:
    try:
        flowFile = session.write(flowFile, TransformCallback(flowFile))
        session.transfer(flowFile, REL_SUCCESS)
    except:
        var = traceback.format_exc()
        session.putAttribute(flowFile, 'log.script.error', var)
        session.transfer(flowFile, REL_FAILURE)
    session.commit()

Example 3​

Script with access to attributes. There are many attributes accessible within the script. You can add new attributes to use in subsequent steps, or you can edit existing ones. When you save an attribute in the "Script" step of an Order Flow, you can retrieve it in the "Write Data to Omnitron" step. This allows you to maintain context across different steps in the flow. Every attribute begins with the prefix “log.” and its name will be logged in the final log file.

Every attribute is accessible in every flow type.

Attributes are always string values. If you attempt to write an attribute that is not a string, you will encounter errors.

In this example, we retrieve the "language" attribute used in "Product Flows" and create a new attribute for logging.

import json
import traceback
from org.apache.nifi.processor.io import StreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets


def get_attributes(flow_file):
    all_attributes = flow_file.getAttributes()
    all_var = flow_file.getAttribute("allVar")
    if all_var:
        return json.loads(all_var)
    return all_attributes


class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile
        self.attributes = get_attributes(flowFile)
        self.script_attributes = {}

    def process(self, inputStream, outputStream):
        #Read input content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        #Load input json
        obj = json.loads(text)

        # process/transform obj 
        output_obj = self.transform(obj)            

        #Write output content
        outputStream.write(bytearray(json.dumps(output_obj, indent=2).encode("utf-8")))

    def transform(obj):
        language = self.attributes["language"]
        for product in obj:
            product["language_related_value"] = language
        self.script_attributes["log.custom.log"] = "Log message here."
        self.script_attributes["custom_attribute"] = "Another attribute"

        return output_obj


flowFile = session.get()
if flowFile is not None:
    try:
        transform = TransformCallback(flowFile)
        flowFile = session.write(flowFile, transform)
        session.putAllAttributes(flowFile, transform.script_attributes)
        session.transfer(flowFile, REL_SUCCESS)
    except:
        var = traceback.format_exc()
        session.putAttribute(flowFile, 'log.script.error', var)
        session.transfer(flowFile, REL_FAILURE)
    session.commit()

Example 4​

Script with extra requests. There may be times when you need additional data that isn't available in your input, or you may need to trigger another job. For these cases, you can use Python’s requests library within the script.

Scripts must be completed within 300 seconds.

The method for saving attributes (i.e., writing all the attributes into the "attributes" key after json.dumps()) only works if you are using the Script Servers.

In this example, we are sending a request to Omnitron to fetch additional data.

import json
import traceback
from org.apache.nifi.processor.io import StreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

import requests

def get_attributes(flow_file):
    all_attributes = flow_file.getAttributes()
    all_var = flow_file.getAttribute("allVar")
    if all_var:
        return json.loads(all_var)
    return all_attributes

class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile
        self.attributes = get_attributes(flowFile)
        self.script_attributes = {}
        self.headers = {
            "Authorization": self.attributes["token.omnitron"],
            "Accept-Encoding": "deflate",   # This is a must
        }

        # https://demo.omnitron.akinon.net/
        self.omnitron_url = self.attributes["omnitron.api.domain"]  # URL Address written in flow edit screen

    def process(self, inputStream, outputStream):
        #Read input content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        #Load input json
        obj = json.loads(text)

        # process/transform obj 
        output_obj = self.transform(obj)            

        #Write output content
        outputStream.write(bytearray(json.dumps(output_obj, indent=2).encode("utf-8")))

    def transform(self, obj):
        results = []
        for var_product in obj:
            products = requests.get("{}api/v1/products/".format(self.omnitron_url), 
                                        params={"base_code__exact": var_product["basecode"]}, 
                                        headers=self.headers, timeout=30)

            for product in products:
                results.append({"product_sku": product["sku"], 
                                "price": var_product["price"], 
                                "retail_price": var_product["retail_price"],
                                "currency_type": var_product["currency_type"],
                                "tax_rate": var_product["tax_rate"],
                                "price_list": self.attributes["omnitron.param.price.id"]})

        self.script_attributes["log.custom.log"] = "Log message here."
        self.script_attributes["custom_attribute"] = "Another attribute"
        return results


flowFile = session.get()
if flowFile is not None:
    try:
        transform = TransformCallback(flowFile)
        flowFile = session.write(flowFile, transform)

        # saving attributes is different when the code runs on Script Servers
        session.putAttribute(flowFile, "attributes", json.dumps(transform.script_attributes))

        session.transfer(flowFile, REL_SUCCESS)
    except:
        var = traceback.format_exc()
        session.putAttribute(flowFile, 'log.script.error', var)
        session.transfer(flowFile, REL_FAILURE)
    session.commit()

Example 5​

Attributes cannot exceed approximately 500 characters. If longer attributes are required, you can include them in the body instead.

The input and output contents do not have to be in JSON format. You can send and receive any text body. However, most flows—such as Stock, Price, and ERP responses—will be automatically converted to JSON before entering the script steps.

import json
import traceback
from org.apache.nifi.processor.io import StreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

import requests
import xmltodict


def get_attributes(flow_file):
    all_attributes = flow_file.getAttributes()
    all_var = flow_file.getAttribute("allVar")
    if all_var:
        return json.loads(all_var)
    return all_attributes


class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile
        self.attributes = get_attributes(flowFile)
        self.script_attributes = {}
        self.headers = {
            "Authorization": self.attributes["token.omnitron"],
            "Accept-Encoding": "deflate",   # This is a must
        }

        # https://demo.omnitron.akinon.net/
        self.omnitron_url = self.attributes["omnitron.api.domain"]  # URL Address written in flow edit screen

    def process(self, inputStream, outputStream):
        #Read input content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        #Load input json
        obj = json.loads(text)

        # process/transform obj
        result = self.transform(obj)
        output_obj = {
            "result_data": xmltodict.unparse(result),
            "attributes": self.script_attributes
        }

        #Write output content
        outputStream.write(bytearray(json.dumps(output_obj, indent=2).encode("utf-8")))

    def transform(self, obj):
        more_data = requests.get("{}api/v1/more_data/".format(self.omnitron_url),
                                params={"number": obj["number"]}, headers=self.headers, timeout=30)
        result = {
            "data": obj,
            "more_data": more_data.json()
        }
        self.script_attributes["log.custom.log"] = "Log message here."
        self.script_attributes["custom_attribute"] = "Another attribute"
        return result


flowFile = session.get()
if flowFile is not None:
    try:
        transform = TransformCallback(flowFile)
        flowFile = session.write(flowFile, transform)

        session.transfer(flowFile, REL_SUCCESS)
    except:
        var = traceback.format_exc()
        session.putAttribute(flowFile, 'log.script.error', var)
        session.transfer(flowFile, REL_FAILURE)
    session.commit()

Last updated

Was this helpful?