Mastering Data Transformation with Benthos and Bloblang in 19 Examples

As the core of each Data Flow Component, such as the Protocol Converter, we use benthos-umh, which is a fork of benthos (nowadays called redpanda connect). It heavily makes use of ‘bloblang’, which is a small but very powerful scripting language.

With bloblang, most common use-cases of adjusting incoming “raw” data into a good looking Unified Namespace can be done. This article shows some examples of what you can do.

These examples are intended for inspiration and may require modification to suit your specific use case. Specifically, some examples are simple benthos bloblang snippets that can be used everywhere, some require a Protocol Converter, some require a Custom Data Flow Component.

1. Decoding Base64:

root.decoded_value = this.encoded_value.decode("base64")

Decodes a base64-encoded string in the field encoded_value.

2. Removing Specific Characters from a String:

root.cleaned_string = this.raw_string.replace_all("-", "").replace_all("_", "")

Removes hyphens and underscores from raw_string.

3. Replacing Dots in Topic Names (e.g., for UNS Compatibility):

root.topic_clean = this.topic.replace_all(".", "_")

Replaces dots with underscores in the topic field.

4. Handling Endianness for Data from Modbus Sources:

Assuming you have a hexadecimal string and need to swap byte order:

root.correct_endian = this.hex_value.decode("hex").reverse().encode("hex")

Reverses the byte order of a hex-encoded string.

5. Converting between JSON and XML:

root.xml_data = this.json_data.format_xml()

Converts a JSON object in json_data to XML format.

6. Bit Relations (Conditional Logic):

root.Error = this.State == "run" && this.speed == 0

Adds a boolean field Error set to true if State is "run" and speed is 0.

7. Simple Calculations (Multiplication):

root.result = this.value.number() * 10

Multiplies the numerical value in value by 10.

8. Random Number Generation (for Validation Use Cases):

root.random_number = random_int(0, 100)

Generates a random integer between 0 and 100.

9. Date Time Conversion (UTC to Local Time):

root.local_time = this.utc_time.ts_parse("2006-01-02T15:04:05Z").ts_format("2006-01-02 15:04:05", "Local")

Converts a UTC timestamp string to local time format.

10. Re-formatting Metadata from OPC UA

When dealing with OPC UA data in protocol converters, you might receive tag names and groups that include unnecessary subfolders or have inconsistent naming conventions. You can use Bloblang to clean up and standardize these fields.

Example (Protocol Converter): Simplifying OPC UA Tag Groups

pipeline:
  processors:
    - bloblang: |
        # Remove unnecessary subfolders from opcua_tag_group
        let cleaned_tag_group = meta("opcua_tag_group").replace_all("UnnecessaryFolder\\.", "")
        
        # Rename subfolders based on a regex pattern
        cleaned_tag_group = $cleaned_tag_group.re_replace_all("OldName(\\d+)", "NewName$1")
        
        # Adjust opcua_tag_name if needed
        let tagname = $cleaned_tag_group.lowercase()
        
        # the rest from here here on will be autogenerated, see also documentation

Explanation:

  • meta("opcua_tag_group") accesses the OPC UA tag group from metadata.
  • replace_all and re_replace_all are used to clean up and standardize the tag group.
  • tagname variable will be referenced in the autogenerated part of a protocol converter

11. Enriching Messages with Database Data

You can enrich incoming messages by fetching additional data from a database.

Example (Custom Data Flow Component): Enriching with SQL Data

pipeline:
  processors:
    - branch:
        processors:
          - sql_select:
              driver: postgres
              dsn: postgres://username:password@database-host:5432/database_name
              table: asset_data
              where: asset_id = this.asset_id.number()
              suffix: ORDER BY timestamp DESC LIMIT 1
              columns:
                - latest_value
        result_map: root.enrichment = this

Explanation:

  • sql_select queries the asset_data table for the latest latest_value where asset_id matches the asset_id in the message.
  • The result is stored in root.enrichment.

Input:

{
  "asset_id": 42,
  "measurement": 123.45
}

Output:

{
  "asset_id": 42,
  "measurement": 123.45,
  "enrichment": {
    "latest_value": 200.00
  }
}

12. Sorting Modbus Devices into Different Folders in UNS

pipeline:
  processors:
    - bloblang: |
        # Use meta("modbus_tag_slaveid") from metadata
        let workcell = match {
          meta("modbus_tag_slaveid") == "10" => "Rack_ID_10",
          meta("modbus_tag_slaveid") == "20" => "Rack_ID_20",
          _ => meta("modbus_tag_slaveid")
        }

        # Use meta("modbus_tag_name") for the tag name
        let tagname = meta("modbus_tag_name")

Explanation:

  • We access the slave ID and tag name from metadata using meta("...").
  • Constructs the message with the appropriate asset hierarchy.

13. Communicating with a Scale (Network Device)

You might need to interact with network devices like scales, where you send a command and process the response via TCP/IP.

Example: Reading Data via TCP/IP

In this example, a "METTLER TOLEDO MT-SICS XP/XS/XA Balances device" is connected via TCP/IP. Every 1 second a "SI" command is sent to the scale, which then response with the weight and the unit of the measurement.

input:
  generate:
    interval: "@every 1s"
    mapping: |
      root = "%s\r\n".format("SI")
      meta command = "SI"
pipeline:
  processors:
    - command:
        name: nc
        args_mapping: '["10.117.216.80", "8000"]'
    - bloblang: |
        # Check if the command response starts with "SS"
        if content().string().starts_with("SS") {
          # Extract the weight value
          let weight = content()
            .replace_all(" ", "")
            .trim_prefix("SS")
            .trim_suffix("g\r\n")
            .number()
          # Extract the unit (e.g., "g" for grams)
          let unit = content()
            .string()
            .re_replace_all("[^a-zA-Z]", "")
          # Set the root object
          root = {
            "value": $weight,
            "timestamp_ms": (timestamp_unix_nano() / 1000000).floor(),
            "meta": {
              "unit": $unit,
              "serial_number": "0037609172"
            }
          }
        } else {
          root = deleted()
        }

Explanation:

  • The command processor runs nc (netcat) to communicate with the scale.
  • The Bloblang mapping processes the response:
    • Verifies that the response starts with "SS".
    • Extracts the weight and unit.
    • Constructs a structured message with a timestamp and metadata.

Input (Scale Response):

SS    123.45g\r\n

Output:

{
  "value": 123.45,
  "timestamp_ms": 1633072800000,
  "meta": {
    "unit": "g",
    "serial_number": "0037609172"
  }
}

14. Aggregating Classification Results from Multiple Topics

Combine messages from different Kafka topics and aggregate the classification results.

Example: Aggregating Classifications

input:
  kafka:
    addresses:
      - kafka-broker:9092
    topics:
      - ia.raw.audio.spectrogram.1.classification
      - ia.raw.audio.spectrogram.2.classification
    consumer_group: "benthos-classification-aggregator"

pipeline:
  processors:
    - branch:
        request_map: |
          # Extract classification based on topic
          let topic = meta("kafka_topic")
          let classification = this.classification_result.class_label
          root = {
            "timestamp_ms": this.timestamp_ms,
            "classification": $classification
          }
        processors:
          - cache:
              resource: memorycache
              operator: set
              key: meta("kafka_topic")
              value: this.classification
    - branch:
        request_map: ""
        processors:
          - cache:
              resource: memorycache
              operator: get
              key: "ia.raw.audio.spectrogram.1.classification"
        result_map: root.classification_1 = content().string()
    - branch:
        request_map: ""
        processors:
          - cache:
              resource: memorycache
              operator: get
              key: "ia.raw.audio.spectrogram.2.classification"
        result_map: root.classification_2 = content().string()
    - bloblang: |
        # Determine final classification based on both results
        let final_classification = match {
          this.classification_1 == "Machine-off" || this.classification_2 == "Machine-off" => "Machine-off",
          this.classification_1 == "Machine-on" && this.classification_2 == "good" => "good",
          this.classification_1 == "Machine-on" && this.classification_2 == "medium" => "medium",
          this.classification_1 == "Machine-on" && this.classification_2 == "bad" => "bad",
          _ => "Unknown"
        }

        root = {
          "timestamp_ms": this.timestamp_ms,
          "classification": $final_classification
        }

output:
  kafka:
    addresses:
      - kafka-broker:9092
    topic: ia.factoryinsight.classification.result
    client_id: benthos-classification-aggregator

cache_resources:
  - label: memorycache
    memory:
      default_ttl: 5m

Explanation:

  • Messages from two Kafka topics are consumed.
  • Each classification result is stored in an in-memory cache.
  • After both classifications are available, they are retrieved and combined to determine a final classification.
  • The final result is published to a new Kafka topic.

15. Measuring Average Message Processing Time

Track the latency between message creation and processing time by calculating the difference in timestamps.

Example: Measuring Latency

pipeline:
  processors:
    - bloblang: |
        # Ignore messages without a timestamp
        root = if !this.exists("timestamp_ms") { deleted() }
    - bloblang: |
        let current_timestamp = (timestamp_unix_nano() / 1000000).floor()
        let original_timestamp = this.timestamp_ms.number()
        let latency = $current_timestamp - $original_timestamp
        let process_value_name = meta("kafka_topic").replace_all(".", "_")

        root = {
          "timestamp_ms": $current_timestamp,
          $process_value_name: $latency
        }

output:
  kafka:
    addresses:
      - kafka-broker:9092
    topic: ia.performance.latency.processValue
    client_id: benthos-latency-measurement

Explanation:

  • Calculates the latency by subtracting the original timestamp from the current time.
  • Constructs a process value name based on the topic.
  • Publishes the latency measurement to a Kafka topic.

16. Fetching Images from a Thermal Camera and Pushing to Kafka

Capture images from a network camera and publish them to a Kafka topic in base64-encoded format.

Example: Capturing and Sending Images

input:
  http_client:
    url: http://camera-ip-address/snapshot.jpg
    verb: GET
    basic_auth:
      enabled: true
      username: admin
      password: admin
    rate_limit: webcam_frequency
    timeout: 5s
    retries: 3

pipeline:
  processors:
    - bloblang: |
        let image_base64 = content().encode("base64").string()
        let timestamp = (timestamp_unix_nano() / 1000000).floor()
        root = {
          "timestamp_ms": $timestamp,
          "image_base64": $image_base64
        }

output:
  kafka:
    addresses:
      - kafka-broker:9092
    topic: ia.raw.camera.images
    client_id: benthos-camera-client

rate_limit_resources:
  - label: webcam_frequency
    local:
      count: 1
      interval: 1s

Explanation:

  • Uses the http_client input to fetch images from the camera.
  • Encodes the image data to base64.
  • Adds a timestamp and constructs the message.
  • Publishes to a Kafka topic.

17. Combining OPC UA Messages into a Single JSON Payload

Example (Custom Data Flow Component): Aggregating OPC UA Messages

pipeline:
  processors:
    - bloblang: |
        # Map OPC UA node IDs to desired field names
        let field_name = match {
          meta("opcua_attr_nodeid") == "ns=4;i=3" => "machineNumber",
          meta("opcua_attr_nodeid") == "ns=4;i=4" => "dataSetNumber",
          _ => meta("opcua_tag_name")
        }
        root = {
          $field_name: this.value
        }
    - archive:
        format: json_array
    - bloblang: |
        # Add a common timestamp and merge all messages
        root = this
          .append({"timestamp_ms": (timestamp_unix_nano() / 1000000).floor()})
          .squash()

Explanation:

  • We use meta("opcua_attr_nodeid") and meta("opcua_tag_name") from metadata.

18. Calculate the average of numerical values within an array.

Example: Calculating an Average

pipeline:
  processors:
    - bloblang: |
        let values = this.measurements
        let total = $values.sum()
        let count = $values.length()
        let average = $total / $count

        root = {
          "timestamp_ms": (timestamp_unix_nano() / 1000000).floor(),
          "average_measurement": $average
        }

Explanation:

  • Sums up all values in the measurements array.
  • Calculates the average.
  • Constructs a new message with the average value.

Input:

{
  "measurements": [10, 20, 30, 40, 50]
}

Output:

{
  "timestamp_ms": 1633072800000,
  "average_measurement": 30
}

19. Conditional Logic Based on Bitmask

Perform actions based on specific bits set in a value (e.g., status codes).

Example: Checking Status Bits

pipeline:
  processors:
    - bloblang: |
        root = {}
        
        # --------------------------------------------------------
        # Approach 1: Binary String Slicing
        # --------------------------------------------------------
        # Convert the 'status_code' integer to an 8-bit binary string.
        # The '%08b' format specifier ensures the binary string is zero-padded to 8 characters.
        let status_code = "%08b".format(this.number().round().int64()) 
        root.status_code = $status_code  # Store the binary string for reference
        
        # Extract individual bits by slicing the binary string.
        # - The `slice` method extracts a substring from the binary string.
        # - Negative indices are used to count from the end of the string.
        #
        # Bit Positions (from right to left):
        # - -1: First bit (Least Significant Bit)
        # - -2: Second bit
        # - -3: Third bit
        #
        # Compare the sliced bit to "1" to determine if the bit is set (true) or not (false).
        
        # Check if the first bit (Least Significant Bit) is set
        root.machine_running = $status_code.slice(-1) == "1"               # First bit
        
        # Check if the second bit is set
        root.error_detected = $status_code.slice(-2, -1) == "1"           # Second bit
        
        # Check if the third bit is set
        root.maintenance_required = $status_code.slice(-3, -2) == "1"    # Third bit
        
        # --------------------------------------------------------
        # Approach 2: Arithmetic Operations
        # --------------------------------------------------------
        # Directly use arithmetic to isolate specific bits without converting to a string.
        let status_code_2 = this.number().round().int64()
        
        # Extract the first bit (Least Significant Bit) using modulo 2.
        # If the remainder is 1, the bit is set.
        root.machine_running_2 = ($status_code_2 % 2) == 1               # First bit
        
        # Extract the second bit by first dividing by 2 (shifting right by 1) and then applying modulo 2.
        # This isolates the second bit from the right.
        root.error_detected_2 = (($status_code_2 / 2) % 2) == 1          # Second bit
        
        # Extract the third bit by dividing by 4 (shifting right by 2) and applying modulo 2.
        # This isolates the third bit from the right.
        root.maintenance_required_2 = (($status_code_2 / 4) % 2) == 1  # Third bit

Explanation:

  • See code comments

Input:

17

Output:

{
	error_detected: "0",
	error_detected_2: false,
	machine_running: "1",
	machine_running_2: true,
	maintenance_required: "0",
	maintenance_required_2: false,
	status_code: "00010"
}

Bonus Samples (added after the original post release)

These are samples that we have added after the original blog post went live.

Matching incoming Weihenstephaner Standard machine state from a Krones machine to UMH datamodel

pipeline:
  processors:
    - bloblang: |
        if meta("kafka_key") == "WHT.Krones_L1.123._historian" && this.exists("Modulfill_cHMI_Modulfill_cHMI1_WS_Cur_State_0_UDIntValue") {
          root = {
            "start_time_unix_ms": this.timestamp_ms,
            "state": match this.Modulfill_cHMI_Modulfill_cHMI1_WS_Cur_State_0_UDIntValue{
              0 => 30000,
              1 => 40000,
              2 => 20000,
              4 => 40000,
              8 => 60000,
              16 => 70000,
              32 => 80000,
              64 => 80000,
              128 => 10000,
              256 => 20000,
              512 => 20000,
              1024 => 180000,
              2048 => 190000,
              4096 => 40000,
              8192 => 20000,
              16384 => 40000,
              32768 => 40000,
              _ => 0
            }
          }
        } else {
          root = deleted() 
        }

"Importing" an external MQTT broker

Setup a MQTT connection, then add a MQTT protocol converter on top of it.

For the input use (adjust <topic> and <autogenerated> accordingly, for autogenerated use whatever the Management Console is generating for you):

mqtt:
  urls:
    - tcp://IP:1883
  topics:
    - <topic>/#
  client_id: <autogenerated>

For processing, choose (and replace <topic> with the topic you choose in the input):

# these variables are autogenerated from your instance location
let enterprise = "enterprise-of-kings"
let site = "jeremy-vm"
let schema = "_local" # this should be set to something different than _historian, except if you comply with the historian schema

let mqtt_topic_appendix = meta("mqtt_topic").replace_all("<topic>", "")
let mqtt_topic_appendix_sanitized = $mqtt_topic_appendix.replace_all("/", ".")

let invalid_payload = this.catch(true)
let payload = match {
    $invalid_payload == true => deleted(),   # Delete if empty payload
    _ => this                             # Keep the message if empty payload
}

let tagname = $mqtt_topic_appendix_sanitized

This will bridge your MQTT broker into a custom data-contract. If you want, you can also adjust the protocol converter to convert this to the _historian data contract as well.

Connecting via SSL/TLS to an MQTT broker with a self-signed certificate

The benthos documentation is not clear on this, but you need to set the urls and tls settings like this:

mqtt:
  urls:
    - ssl://<IP>:8883
  topics:
    - <topic>#
  client_id: <client_id>
  user: <username>
  password:  <password>
  tls:
    enabled: true
    skip_cert_verify: true

Note the ssl instead of tcp in the URL and the changed port. As well as the skip_cert_verify in the tls settings

Further Resources: