Mastering Data Transformation with Benthos and Bloblang in 19 Examples
As the core of each Data Flow Component, such as the Protocol Converter, we use benthos-umh, which is a fork of benthos (nowadays called redpanda connect). It heavily makes use of ‘bloblang’, which is a small but very powerful scripting language.
With bloblang, most common use-cases of adjusting incoming “raw” data into a good looking Unified Namespace can be done. This article shows some examples of what you can do.
These examples are intended for inspiration and may require modification to suit your specific use case. Specifically, some examples are simple benthos bloblang snippets that can be used everywhere, some require a Protocol Converter, some require a Custom Data Flow Component.
1. Decoding Base64:
root.decoded_value = this.encoded_value.decode("base64")
Decodes a base64-encoded string in the field encoded_value
.
2. Removing Specific Characters from a String:
root.cleaned_string = this.raw_string.replace_all("-", "").replace_all("_", "")
Removes hyphens and underscores from raw_string
.
3. Replacing Dots in Topic Names (e.g., for UNS Compatibility):
root.topic_clean = this.topic.replace_all(".", "_")
Replaces dots with underscores in the topic
field.
4. Handling Endianness for Data from Modbus Sources:
Assuming you have a hexadecimal string and need to swap byte order:
root.correct_endian = this.hex_value.decode("hex").reverse().encode("hex")
Reverses the byte order of a hex-encoded string.
5. Converting between JSON and XML:
root.xml_data = this.json_data.format_xml()
Converts a JSON object in json_data
to XML format.
6. Bit Relations (Conditional Logic):
root.Error = this.State == "run" && this.speed == 0
Adds a boolean field Error
set to true if State
is "run" and speed
is 0.
7. Simple Calculations (Multiplication):
root.result = this.value.number() * 10
Multiplies the numerical value in value
by 10.
8. Random Number Generation (for Validation Use Cases):
root.random_number = random_int(0, 100)
Generates a random integer between 0 and 100.
9. Date Time Conversion (UTC to Local Time):
root.local_time = this.utc_time.ts_parse("2006-01-02T15:04:05Z").ts_format("2006-01-02 15:04:05", "Local")
Converts a UTC timestamp string to local time format.
10. Re-formatting Metadata from OPC UA
When dealing with OPC UA data in protocol converters, you might receive tag names and groups that include unnecessary subfolders or have inconsistent naming conventions. You can use Bloblang to clean up and standardize these fields.
Example (Protocol Converter): Simplifying OPC UA Tag Groups
pipeline:
processors:
- bloblang: |
# Remove unnecessary subfolders from opcua_tag_group
let cleaned_tag_group = meta("opcua_tag_group").replace_all("UnnecessaryFolder\\.", "")
# Rename subfolders based on a regex pattern
cleaned_tag_group = $cleaned_tag_group.re_replace_all("OldName(\\d+)", "NewName$1")
# Adjust opcua_tag_name if needed
let tagname = $cleaned_tag_group.lowercase()
# the rest from here here on will be autogenerated, see also documentation
Explanation:
meta("opcua_tag_group")
accesses the OPC UA tag group from metadata.replace_all
andre_replace_all
are used to clean up and standardize the tag group.tagname
variable will be referenced in the autogenerated part of a protocol converter
11. Enriching Messages with Database Data
You can enrich incoming messages by fetching additional data from a database.
Example (Custom Data Flow Component): Enriching with SQL Data
pipeline:
processors:
- branch:
processors:
- sql_select:
driver: postgres
dsn: postgres://username:password@database-host:5432/database_name
table: asset_data
where: asset_id = this.asset_id.number()
suffix: ORDER BY timestamp DESC LIMIT 1
columns:
- latest_value
result_map: root.enrichment = this
Explanation:
sql_select
queries theasset_data
table for the latestlatest_value
whereasset_id
matches theasset_id
in the message.- The result is stored in
root.enrichment
.
Input:
{
"asset_id": 42,
"measurement": 123.45
}
Output:
{
"asset_id": 42,
"measurement": 123.45,
"enrichment": {
"latest_value": 200.00
}
}
12. Sorting Modbus Devices into Different Folders in UNS
pipeline:
processors:
- bloblang: |
# Use meta("modbus_tag_slaveid") from metadata
let workcell = match {
meta("modbus_tag_slaveid") == "10" => "Rack_ID_10",
meta("modbus_tag_slaveid") == "20" => "Rack_ID_20",
_ => meta("modbus_tag_slaveid")
}
# Use meta("modbus_tag_name") for the tag name
let tagname = meta("modbus_tag_name")
Explanation:
- We access the slave ID and tag name from metadata using
meta("...")
. - Constructs the message with the appropriate asset hierarchy.
13. Communicating with a Scale (Network Device)
You might need to interact with network devices like scales, where you send a command and process the response via TCP/IP.
Example: Reading Data via TCP/IP
In this example, a "METTLER TOLEDO MT-SICS XP/XS/XA Balances device" is connected via TCP/IP. Every 1 second a "SI" command is sent to the scale, which then response with the weight and the unit of the measurement.
input:
generate:
interval: "@every 1s"
mapping: |
root = "%s\r\n".format("SI")
meta command = "SI"
pipeline:
processors:
- command:
name: nc
args_mapping: '["10.117.216.80", "8000"]'
- bloblang: |
# Check if the command response starts with "SS"
if content().string().starts_with("SS") {
# Extract the weight value
let weight = content()
.replace_all(" ", "")
.trim_prefix("SS")
.trim_suffix("g\r\n")
.number()
# Extract the unit (e.g., "g" for grams)
let unit = content()
.string()
.re_replace_all("[^a-zA-Z]", "")
# Set the root object
root = {
"value": $weight,
"timestamp_ms": (timestamp_unix_nano() / 1000000).floor(),
"meta": {
"unit": $unit,
"serial_number": "0037609172"
}
}
} else {
root = deleted()
}
Explanation:
- The
command
processor runsnc
(netcat) to communicate with the scale. - The Bloblang mapping processes the response:
- Verifies that the response starts with
"SS"
. - Extracts the weight and unit.
- Constructs a structured message with a timestamp and metadata.
- Verifies that the response starts with
Input (Scale Response):
SS 123.45g\r\n
Output:
{
"value": 123.45,
"timestamp_ms": 1633072800000,
"meta": {
"unit": "g",
"serial_number": "0037609172"
}
}
14. Aggregating Classification Results from Multiple Topics
Combine messages from different Kafka topics and aggregate the classification results.
Example: Aggregating Classifications
input:
kafka:
addresses:
- kafka-broker:9092
topics:
- ia.raw.audio.spectrogram.1.classification
- ia.raw.audio.spectrogram.2.classification
consumer_group: "benthos-classification-aggregator"
pipeline:
processors:
- branch:
request_map: |
# Extract classification based on topic
let topic = meta("kafka_topic")
let classification = this.classification_result.class_label
root = {
"timestamp_ms": this.timestamp_ms,
"classification": $classification
}
processors:
- cache:
resource: memorycache
operator: set
key: meta("kafka_topic")
value: this.classification
- branch:
request_map: ""
processors:
- cache:
resource: memorycache
operator: get
key: "ia.raw.audio.spectrogram.1.classification"
result_map: root.classification_1 = content().string()
- branch:
request_map: ""
processors:
- cache:
resource: memorycache
operator: get
key: "ia.raw.audio.spectrogram.2.classification"
result_map: root.classification_2 = content().string()
- bloblang: |
# Determine final classification based on both results
let final_classification = match {
this.classification_1 == "Machine-off" || this.classification_2 == "Machine-off" => "Machine-off",
this.classification_1 == "Machine-on" && this.classification_2 == "good" => "good",
this.classification_1 == "Machine-on" && this.classification_2 == "medium" => "medium",
this.classification_1 == "Machine-on" && this.classification_2 == "bad" => "bad",
_ => "Unknown"
}
root = {
"timestamp_ms": this.timestamp_ms,
"classification": $final_classification
}
output:
kafka:
addresses:
- kafka-broker:9092
topic: ia.factoryinsight.classification.result
client_id: benthos-classification-aggregator
cache_resources:
- label: memorycache
memory:
default_ttl: 5m
Explanation:
- Messages from two Kafka topics are consumed.
- Each classification result is stored in an in-memory cache.
- After both classifications are available, they are retrieved and combined to determine a final classification.
- The final result is published to a new Kafka topic.
15. Measuring Average Message Processing Time
Track the latency between message creation and processing time by calculating the difference in timestamps.
Example: Measuring Latency
pipeline:
processors:
- bloblang: |
# Ignore messages without a timestamp
root = if !this.exists("timestamp_ms") { deleted() }
- bloblang: |
let current_timestamp = (timestamp_unix_nano() / 1000000).floor()
let original_timestamp = this.timestamp_ms.number()
let latency = $current_timestamp - $original_timestamp
let process_value_name = meta("kafka_topic").replace_all(".", "_")
root = {
"timestamp_ms": $current_timestamp,
$process_value_name: $latency
}
output:
kafka:
addresses:
- kafka-broker:9092
topic: ia.performance.latency.processValue
client_id: benthos-latency-measurement
Explanation:
- Calculates the latency by subtracting the original timestamp from the current time.
- Constructs a process value name based on the topic.
- Publishes the latency measurement to a Kafka topic.
16. Fetching Images from a Thermal Camera and Pushing to Kafka
Capture images from a network camera and publish them to a Kafka topic in base64-encoded format.
Example: Capturing and Sending Images
input:
http_client:
url: http://camera-ip-address/snapshot.jpg
verb: GET
basic_auth:
enabled: true
username: admin
password: admin
rate_limit: webcam_frequency
timeout: 5s
retries: 3
pipeline:
processors:
- bloblang: |
let image_base64 = content().encode("base64").string()
let timestamp = (timestamp_unix_nano() / 1000000).floor()
root = {
"timestamp_ms": $timestamp,
"image_base64": $image_base64
}
output:
kafka:
addresses:
- kafka-broker:9092
topic: ia.raw.camera.images
client_id: benthos-camera-client
rate_limit_resources:
- label: webcam_frequency
local:
count: 1
interval: 1s
Explanation:
- Uses the
http_client
input to fetch images from the camera. - Encodes the image data to base64.
- Adds a timestamp and constructs the message.
- Publishes to a Kafka topic.
17. Combining OPC UA Messages into a Single JSON Payload
Example (Custom Data Flow Component): Aggregating OPC UA Messages
pipeline:
processors:
- bloblang: |
# Map OPC UA node IDs to desired field names
let field_name = match {
meta("opcua_attr_nodeid") == "ns=4;i=3" => "machineNumber",
meta("opcua_attr_nodeid") == "ns=4;i=4" => "dataSetNumber",
_ => meta("opcua_tag_name")
}
root = {
$field_name: this.value
}
- archive:
format: json_array
- bloblang: |
# Add a common timestamp and merge all messages
root = this
.append({"timestamp_ms": (timestamp_unix_nano() / 1000000).floor()})
.squash()
Explanation:
- We use
meta("opcua_attr_nodeid")
andmeta("opcua_tag_name")
from metadata.
18. Calculate the average of numerical values within an array.
Example: Calculating an Average
pipeline:
processors:
- bloblang: |
let values = this.measurements
let total = $values.sum()
let count = $values.length()
let average = $total / $count
root = {
"timestamp_ms": (timestamp_unix_nano() / 1000000).floor(),
"average_measurement": $average
}
Explanation:
- Sums up all values in the
measurements
array. - Calculates the average.
- Constructs a new message with the average value.
Input:
{
"measurements": [10, 20, 30, 40, 50]
}
Output:
{
"timestamp_ms": 1633072800000,
"average_measurement": 30
}
19. Conditional Logic Based on Bitmask
Perform actions based on specific bits set in a value (e.g., status codes).
Example: Checking Status Bits
pipeline:
processors:
- bloblang: |
root = {}
# --------------------------------------------------------
# Approach 1: Binary String Slicing
# --------------------------------------------------------
# Convert the 'status_code' integer to an 8-bit binary string.
# The '%08b' format specifier ensures the binary string is zero-padded to 8 characters.
let status_code = "%08b".format(this.number().round().int64())
root.status_code = $status_code # Store the binary string for reference
# Extract individual bits by slicing the binary string.
# - The `slice` method extracts a substring from the binary string.
# - Negative indices are used to count from the end of the string.
#
# Bit Positions (from right to left):
# - -1: First bit (Least Significant Bit)
# - -2: Second bit
# - -3: Third bit
#
# Compare the sliced bit to "1" to determine if the bit is set (true) or not (false).
# Check if the first bit (Least Significant Bit) is set
root.machine_running = $status_code.slice(-1) == "1" # First bit
# Check if the second bit is set
root.error_detected = $status_code.slice(-2, -1) == "1" # Second bit
# Check if the third bit is set
root.maintenance_required = $status_code.slice(-3, -2) == "1" # Third bit
# --------------------------------------------------------
# Approach 2: Arithmetic Operations
# --------------------------------------------------------
# Directly use arithmetic to isolate specific bits without converting to a string.
let status_code_2 = this.number().round().int64()
# Extract the first bit (Least Significant Bit) using modulo 2.
# If the remainder is 1, the bit is set.
root.machine_running_2 = ($status_code_2 % 2) == 1 # First bit
# Extract the second bit by first dividing by 2 (shifting right by 1) and then applying modulo 2.
# This isolates the second bit from the right.
root.error_detected_2 = (($status_code_2 / 2) % 2) == 1 # Second bit
# Extract the third bit by dividing by 4 (shifting right by 2) and applying modulo 2.
# This isolates the third bit from the right.
root.maintenance_required_2 = (($status_code_2 / 4) % 2) == 1 # Third bit
Explanation:
- See code comments
Input:
17
Output:
{
error_detected: "0",
error_detected_2: false,
machine_running: "1",
machine_running_2: true,
maintenance_required: "0",
maintenance_required_2: false,
status_code: "00010"
}
Bonus Samples (added after the original post release)
These are samples that we have added after the original blog post went live.
Matching incoming Weihenstephaner Standard machine state from a Krones machine to UMH datamodel
pipeline:
processors:
- bloblang: |
if meta("kafka_key") == "WHT.Krones_L1.123._historian" && this.exists("Modulfill_cHMI_Modulfill_cHMI1_WS_Cur_State_0_UDIntValue") {
root = {
"start_time_unix_ms": this.timestamp_ms,
"state": match this.Modulfill_cHMI_Modulfill_cHMI1_WS_Cur_State_0_UDIntValue{
0 => 30000,
1 => 40000,
2 => 20000,
4 => 40000,
8 => 60000,
16 => 70000,
32 => 80000,
64 => 80000,
128 => 10000,
256 => 20000,
512 => 20000,
1024 => 180000,
2048 => 190000,
4096 => 40000,
8192 => 20000,
16384 => 40000,
32768 => 40000,
_ => 0
}
}
} else {
root = deleted()
}
"Importing" an external MQTT broker
Setup a MQTT connection, then add a MQTT protocol converter on top of it.
For the input use (adjust <topic> and <autogenerated> accordingly, for autogenerated use whatever the Management Console is generating for you):
mqtt:
urls:
- tcp://IP:1883
topics:
- <topic>/#
client_id: <autogenerated>
For processing, choose (and replace <topic> with the topic you choose in the input):
# these variables are autogenerated from your instance location
let enterprise = "enterprise-of-kings"
let site = "jeremy-vm"
let schema = "_local" # this should be set to something different than _historian, except if you comply with the historian schema
let mqtt_topic_appendix = meta("mqtt_topic").replace_all("<topic>", "")
let mqtt_topic_appendix_sanitized = $mqtt_topic_appendix.replace_all("/", ".")
let invalid_payload = this.catch(true)
let payload = match {
$invalid_payload == true => deleted(), # Delete if empty payload
_ => this # Keep the message if empty payload
}
let tagname = $mqtt_topic_appendix_sanitized
This will bridge your MQTT broker into a custom data-contract. If you want, you can also adjust the protocol converter to convert this to the _historian data contract as well.
Connecting via SSL/TLS to an MQTT broker with a self-signed certificate
The benthos documentation is not clear on this, but you need to set the urls
and tls
settings like this:
mqtt:
urls:
- ssl://<IP>:8883
topics:
- <topic>#
client_id: <client_id>
user: <username>
password: <password>
tls:
enabled: true
skip_cert_verify: true
Note the ssl
instead of tcp
in the URL and the changed port. As well as the skip_cert_verify
in the tls
settings