Using Kafka Messages in Node-RED
This guide will show you how to use the node-red-contrib-kafkajs package to send and receive Kafka messages in Node-RED.
Instructions
- To start, you need to install the node-red-contrib-kafkajs package. Open the Node-RED browser interface by navigating to
<IP of the installation device>:1880/nodered
on the United Manufacturing Hub. - Click on the three horizontal lines in the upper right corner to open the menu, then select "Manage palette."
- In the new window that opens, select the "Install" tab.
- In the search bar, type in "node-red-contrib-kafkajs," then select the package and click on "Install."
- You should now have two new nodes in your Node-RED library: kafkajs-consumer and kafkajs-producer.
The kafkajs-consumer node listens on Kafka topics and relays messages to linked nodes, while the kafkajs-producer node produces messages on the designated topic.
When using either node, you need to configure the kafkajs-client node and make sure it has the correct address of the Kafka broker from which you want to receive messages (replace 127.0.0.1
with united-manufacturing-hub-kafka
):
⚠️
When switching from MQTT to Kafka in Node-RED, please note that you need to change the topic (replace
/
with .
) as well as add a JSON node in front of the kafkajs-producer node. Otherwise, you will get a "Kafka Producer Error" message.- To set up a simple example, you can use the blue injection node to send the timestamp to a function node, couple it with a JSON node (see also warning above) and then with the producer node, which will then publish them.
- Leave the inject node as it is.
- The function node could contain something like this:
msg.topic = "ia.factoryinsight.oeetestlocation.oeetestasset.state"
msg.payload = {
"timestamp_ms": msg.payload,
"state": msg.state
}
return msg;
- Ensure that you have "Allow Auto Topic Creation" enabled in the kafkajs-producer node. You might need to press on "advanced Options" for this
- . The kafkajs-consumer node works similarly. Set up a topic for it to listen to, then forward the message when it receives one. Please also take note of the warning above and adjust the topic and add a JSON node. You can then process the message as desired and either send it back out as an MQTT message with the mqtt-out node or as a Kafka message with a kafkajs-producer node.
⚠️
The return messages of kafkajs-consumer will also look differently from those of MQTT. The payload is not in
msg.payload
, but in msg.payload.value