How to push messages into MongoDB using Benthos

Create a Benthos deployment in UMHLens/OpenLens and configure it to push kafka messages into MongoDB.

How to push messages into MongoDB using Benthos


Setting up a Benthos deployment is a quick way to get data into MongoDB. Just follow the steps provided and easily push data into MongoDB.

Instructions

  1. Open UMHLens and create a Create resource by clicking on the + icon at the bottom.

UMHLensCreateResource
2. Paste in the YAML file (below)
3. Adjust the topic to where benthos should listen to
yamlBenthosTopic
4. Adjust the database and collection, to where you want to save the messages
yamlBenthosCollectionDatabase-1
5. Copy the password located in UMHLens under Config -> Secrets -> mongodb and replace CHANGE_ME with it.
yamlBenthosPassword
6. Additionally you can modify the pipeline. For now it only takes the incoming message and adds a timestamp to it.
7. When you are done click create to get a benthos-deployment.
8. If you want to make a new deployment make sure, to delete the previous deployment and config-map.
9. The deployment is located under Workloads -> Deployments
umhLensdeletdeployments
10. The config-map is located under Config -> ConfigMaps
UmhLensdeletConfigmaps

Message format

  1. For consitency reasons use the following message structure:
{
	“orderID”: “<orderID>”,
	“parents”: [“<parent-order-id-1>”, …],
	“enterprise”: <enterprise>,
	“site”:....
	“area”..
	“line”..
	“workCell”..
    “timestamp_begin” (only necessary for processValue)
    “timestamp_end” (only necessary for processValue)
	“data”: {
		<whatever>
	}
}

Benthos YAML

apiVersion: v1
kind: ConfigMap
metadata:
  name: benthos-1-config
  namespace: united-manufacturing-hub
  labels:
    app: benthos-1
data:
  benthos.yaml: |-
    input:
      kafka:
        addresses:
          - united-manufacturing-hub-kafka:9092
        topics:
          - ia.raw.<asset>.mongodb.productTagv2Alpha
        consumer_group: "benthos-mongodb"
    pipeline:
      processors:
        - bloblang: |
            root = {
              "message": this,
              "timestamp_unix": timestamp_unix()
            }
    output:
      mongodb:
        url: mongodb://mongodb:27017
        database: Test
        username: root
        password: CHANGE_ME
        operation: insert-one
        collection: Test
        write_concern:
          w: majority
          j: true
          w_timeout: ""
        document_map:  |-
          root.message = this.message
          root.timestamp_unix = this.timestamp_unix
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: benthos-1-deployment
  namespace: united-manufacturing-hub
  labels:
    app: benthos-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: benthos-1
  template:
    metadata:
      labels:
        app: benthos-1
    spec:
      containers:
        - name: benthos-1
          image: "ghcr.io/united-manufacturing-hub/benthos-umh:latest"
          imagePullPolicy: IfNotPresent
          ports:
            - name: http
              containerPort: 4195
              protocol: TCP
          livenessProbe:
            httpGet:
              path: /ping
              port: http
          readinessProbe:
            initialDelaySeconds: 30
            httpGet:
              path: /ready
              port: http
          volumeMounts:
            - name: config
              mountPath: "/benthos.yaml"
              subPath: "benthos.yaml"
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: benthos-1-config

Read next

Share, Engage, and Contribute!

Discover how you can share your ideas, contribute to our blog, and connect with us on other platforms.