In this article, we’ll introduce two key enhancements to the open-source benthos-umh
and UMH: the nodered_js
processor and the new tag_processor
.
Inspired by Node-RED’s simplicity and powered by familiar JavaScript, nodered_js
makes message transformations more approachable while retaining the performance and scalability you need. On top of that, the tag_processor
brings order and flexibility to how you shape and label your data—without burdening you with boilerplate code or annoying user interfaces.
Fire up a ChatGPT prompt, take on that repetitive contextualization task, and see just how effortless IIoT data ops can become.
The Challenges of Bloblang and our current Protocol Converters
At the UMH, we've committed ourselves to leveraging best-in-class open-source tools grounded in IT/OT best practices. For our data streaming and contextualization tasks, we went all-in on Benthos / Redpanda Connect and created our own fork of it (”benthos-umh”), which maintains full upstream compatibility but extends it with manufacturing-specific plugins such as connectivity for OPC UA, Siemens S7, and many more.
One core part of Benthos is Bloblang—a powerful, but sometimes unwieldy, domain-specific language for data transformation. At first, we liked it. However, coming from environments like Node-RED, we quickly found it frustrating:
- Lack of Logging and Debugging: There are no logging outputs within Bloblang. You cannot simply do a
printf
orconsole.log
to output debug data, nor can you use debuggers. - Cryptic Error Messages: When you get an error, you only receive a cryptic message without any information about the message that caused it.
- Steep Complexity Curve: It's suitable for simple transformations, but as soon as you try to do something more advanced, you might spend hours wrestling with "simple things."
- Limited AI Assistance: Additionally, tools like ChatGPT struggle to help with Bloblang; they often hallucinate solutions that don't work.
Despite these challenges, benthos offered better scalability and maintainability compared to Node-RED for our use cases (see also this video here).
Our current protocol converters, built upon Benthos and Bloblang, also struggle in certain areas:
- Complex Topic Management: Because Kafka is perceived to struggle with a lot of topics (a myth we'll debunk later), we needed to split incoming MQTT topics into a Kafka topic and key. This resulted in a lot of boilerplate code, complicating life for users.
- Confusing Variable Naming: We attempted to simplify this by exposing variables like
tagName
,tagGroup
,enterprise
,site
,area
, etc., which we'd then use to create the correct topic and key. However, the naming of those variables was suboptimal and confused many people.
As a "normal" user of the Management Console, you wouldn't need to worry about this. You could easily just "press continue" and leverage the automatically generated Benthos templates. But power users—those who needed to modify configurations—found themselves bogged down.
We decided to break this cycle and make life easier for those power users without compromising simplicity for everyone else.
Node-RED Within Benthos
Where Bloblang can be hard to debug and reason about, Node-RED’s JavaScript approach feels simpler. In Node-RED, you can easily write a small JavaScript snippet, hit deploy, and watch the debug panel for immediate feedback. We wanted that same simplicity inside Benthos, without sacrificing the power and flexibility that advanced users require.
So we created nodered_js
.
nodered_js
processor
The nodered_js
processor lets you handle messages in Benthos using Node-RED’s familiar msg.payload
and msg.meta
style. This means you can use straightforward JavaScript to transform data, set metadata, filter messages, and even log what’s going on—all from within Benthos.
Configuration Example
pipeline:
processors:
- nodered_js:
code: |
// Example: Double a numeric value
msg.payload = msg.payload * 2;
return msg;
How It Works:
- Message Structure: Your message enters the processor as
msg
, with:msg.payload
: The message content.msg.meta
: Associated metadata like the modbus address or the OPC UA BrowseName.
- Standard JavaScript: You modify
msg
using standard JavaScript, thenreturn msg;
to send it forward. - Control Flow: To drop a message,
return null
. To create multiple messages, return an array of messages.
Logging and Debugging
With console.log
, you can print messages right from within your JavaScript code. This makes it far easier to understand what’s happening when something goes wrong, especially compared to Bloblang’s limited error messages. This aligns with our vision of providing tools that both new and power users can grasp quickly, grounded in familiar practices.
Performance Considerations
While JavaScript is slower than Bloblang, it’s still fast enough for most industrial workloads. For example, processing 1000 incoming tags (without any batching!) only takes around 20 milliseconds, which is acceptable in typical manufacturing scenarios where data does not need microsecond-level processing.
Example: Filtering Messages
Example: Metadata Manipulation
A Real-World Impact
Believe me, this makes a big impact! A colleague of mine was stuck for over 8 hours on some advanced Benthos Bloblang processing. When I asked ChatGPT to solve this problem using the above nodered_js
and an example, it did so within 15 seconds (thank you o1-mini).
Go ahead, try it out now yourself! Open up the ChatGPT prompt, copy this article, and throw your annoying problem on it!
But that's not all!
Same Topic Experience
Most Kafka best practices advise limiting the number of topics to avoid performance overhead. Conventional wisdom says that once you hit a few thousand, you’re asking for trouble. But what if this barrier isn’t so fixed?
By testing on a small edge device and tweaking just four parameters in Redpanda, we’ve successfully run over 30,000 topics—sustainably, for months. This defies the standard recommendations and suggests that with the right configuration and tooling, you can treat Kafka topics more like MQTT topics: one per sensor, one per data stream, without the usual complexity or performance hits.
We’re rolling out these configurations as an experimental, opt-in feature to give you fine-grained control, letting you subscribe directly to the single tag of interest, just as you would in MQTT. While this may not align with older Kafka usage guidelines, it opens the door to simpler workflows, less boilerplate, and more precise data flows—all without the feared downsides of "too many topics."
This approach underscores our commitment to starting with IT/OT best practices, challenging outdated assumptions, and focusing on delivering tangible benefits to our users.
But it gets even better.
Bringing it together: the new protocol converter and tag_processor
The tag_processor
builds on the Node-RED style simplicity, taking it a step further. Its goal: automatically shape your data into a consistent format and replace the current protocol converter—without compromising the simplicity for first-time users or the flexibility for power users.
How It Works: An Opinionated, Yet Flexible Approach
pipeline:
processors:
- tag_processor:
defaults: |
# Set default location hierarchy and datacontract
msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
msg.meta.data_contract = "_historian";
return msg;
conditions:
- if: msg.meta.opcua_node_id === "ns=1;i=2245"
then: |
# Set path hierarchy and tag name for specific OPC UA node
msg.meta.virtual_path = "axis.x.position";
msg.meta.tag_name = "actual";
return msg;
advancedProcessing: |
# Optional advanced message processing
# Example: double numeric values
msg.payload = parseFloat(msg.payload) * 2;
return msg;
Stages:
- Defaults: Set baseline metadata (like
location_path
ordata_contract
) before processing each message. - Conditions: Tailor behavior based on specific attributes (like matching a particular OPC UA node), adjusting metadata or transforming the message as needed.
- Advanced Processing: Perform final transformations, filter messages, or create multiple outputs from a single input.
Each stage returns a msg
object (or null
to drop it), making the flow straightforward and Node-RED-like.
Example: Basic Configuration
pipeline:
processors:
- tag_processor:
defaults: |
msg.meta.location_path = "enterprise.plant1.lineA.machine1.plc42";
msg.meta.data_contract = "_historian";
msg.meta.tag_name = "temperature";
return msg;
Result:
A simple numeric input like 23.5
is transformed into:
{
"temperature": 23.5,
"timestamp_ms": 1733903611000
}
With a topic automatically generated from the metadata:
umh.v1.enterprise.plant1.lineA.machine1.plc42._historian.temperature
Example: Conditional and Advanced Processing
pipeline:
processors:
- tag_processor:
defaults: |
msg.meta.location_path = "enterprise.plant1";
msg.meta.data_contract = "_historian";
return msg;
conditions:
- if: opcua_attr_nodeid === "ns=1;i=2245"
then: |
msg.meta.virtual_path = "axis.x.position";
msg.meta.tag_name = "actual";
return msg;
advancedProcessing: |
// Double numeric values just as a demo
if (typeof msg.payload === "number") {
msg.payload = msg.payload * 2;
}
return msg;
What Happens:
- Defaults set a baseline location and contract.
- A condition checks for a specific OPC UA node ID and sets
virtual_path
andtag_name
, organizing it into a different “folder”/“topic” in the Unified Namespace. - Advanced processing then doubles the numeric payload.
If the input is 23.5
and opcua_attr_nodeid
matches, the result might be:
{
"actual": 47.0,
"timestamp_ms": 1733903611000
}
With a topic:
umh.v1.enterprise.plant1._historian.axis.x.position.actual
Why Use tag_processor
?
By combining flexible defaults, dynamic conditions, and easy JavaScript transformations, tag_processor
saves you from scattered boilerplate code. Instead of writing complex Bloblang expressions or frustratingly clicking through some UI, you declare your intent once and let tag_processor
(together with ChatGPT) handle the details.
Summary
Starting from benthos-umh
v0.5.0 or in the next nightly
release of the UMH, you will find both processors available as experimental features. You can find more information in the README.
The configuration of tag_processor
might still change based on your feedback. Being open source, we believe in the power of community engagement to refine and improve our tools continuously. We can't guarantee backwards compatibility yet, but we are eagerly awaiting your input!