Using Kafka Messages in Node-RED

This guide will show you how to use the node-red-contrib-kafkajs package to send and receive Kafka messages in Node-RED.

Instructions

  1. To start, you need to install the node-red-contrib-kafkajs package. Open the Node-RED browser interface by navigating to <IP of the installation device>:1880/nodered on the United Manufacturing Hub.
  2. Click on the three horizontal lines in the upper right corner to open the menu, then select "Manage palette."
  3. In the new window that opens, select the "Install" tab.
  4. In the search bar, type in "node-red-contrib-kafkajs," then select the package and click on "Install."
  5. You should now have two new nodes in your Node-RED library: kafkajs-consumer and kafkajs-producer.

The kafkajs-consumer node listens on Kafka topics and relays messages to linked nodes, while the kafkajs-producer node produces messages on the designated topic.

When using either node, you need to configure the kafkajs-client node and make sure it has the correct address of the Kafka broker from which you want to receive messages (replace 127.0.0.1 with united-manufacturing-hub-kafka):

⚠️
When switching from MQTT to Kafka in Node-RED, please note that you need to change the topic (replace / with .) as well as add a JSON node in front of the kafkajs-producer node. Otherwise, you will get a "Kafka Producer Error" message.
  1. To set up a simple example, you can use the blue injection node to send the timestamp to a function node, couple it with a JSON node (see also warning above) and then with the producer node, which will then publish them.
  2. Leave the inject node as it is.
  3. The function node could contain something like this:
msg.topic = "ia.factoryinsight.oeetestlocation.oeetestasset.state"
msg.payload = {
    "timestamp_ms": msg.payload,
    "state": msg.state
}
return msg;
  1. Ensure that you have "Allow Auto Topic Creation" enabled in the kafkajs-producer node. You might need to press on "advanced Options" for this
  2. . The kafkajs-consumer node works similarly. Set up a topic for it to listen to, then forward the message when it receives one. Please also take note of the warning above and adjust the topic and add a JSON node. You can then process the message as desired and either send it back out as an MQTT message with the mqtt-out node or as a Kafka message with a kafkajs-producer node.
⚠️
The return messages of kafkajs-consumer will also look differently from those of MQTT. The payload is not in msg.payload, but in msg.payload.value