By clicking “Accept All Cookies”, you agree to the storing of cookies on your device to enhance site navigation, analyze site usage, and assist in our marketing efforts. View our Privacy Policy for more information.
Unified Namespace ·

Connecting the UNS with InfluxDB using Benthos

In this course you will learn how to connect InfluxDB with the UNS using Benthos.

Connecting the UNS with InfluxDB using Benthos

Prerequisites

  • Access to your cluster via OpenLens

Steps

  1. Connect to your cluster using OpenLens.
  2. Open a Create Resource tab located at the bottom left.
  3. Paste in the YAML below and adjust the following fields under output:
  4. Endpoint: your influxDB Endpoint
  5. Organization: your influxDB Oganization
  6. Bucket: your influxDB Bucket
  7. Token: your influxDB API-Token
  8. Configure your topics which benthos should listen to. In this example we are using the IoT-simulator.
  9. Depending on your topic choice, you might need to adjust the root assignment, to match your specified topic. The meta("kafka_topic").string().split(".").<X> splits up the topic into a list, e.g ìa.factoryinsight.x into [ia, factoryinsight, x]. You can now choose the correct key, by replacing the X with a number, beginning from 0.
  10. Finally click on create and a new benthos pod will spawn.

💡
InfluxDB uses a unique message structure, so make sure to adjust the root field to your desired message.
apiVersion: v1
kind: ConfigMap
metadata:
  name: benthos-influx-config
  namespace: united-manufacturing-hub
  labels:
    app: benthos-influx
data:
  benthos.yaml: |-
    input:
      kafka_franz:
        seed_brokers:
          - united-manufacturing-hub-kafka:9092
        topics:
          - ia.raw.development.ioTSensors.*
        regexp_topics: true
        consumer_group: "benthos-1"
    pipeline:
      processors:
        - bloblang: |
            let value = content().number().string()
            root = meta("kafka_topic").string().split(".").2 + ",sensor=" + meta("kafka_topic").string().split(".").3 + " " + meta("kafka_topic").string().split(".").4 + "=" + $value + " " 
    output:
      type: http_client
      http_client:
        url: http://<Endpoint>:8086/api/v2/write?org=<Organisation>&bucket=<Bucket>&precision=ms
        verb: POST
        headers:
          Authorization: Token <Token>
          Content-Type: text/plain
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: benthos-influx-deployment
  namespace: united-manufacturing-hub
  labels:
    app: benthos-influx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: benthos-influx
  template:
    metadata:
      labels:
        app: benthos-influx
    spec:
      containers:
        - name: benthos-influx
          image: ghcr.io/united-manufacturing-hub/benthos-umh:latest
          imagePullPolicy: IfNotPresent
          ports:
            - name: http
              containerPort: 4195
              protocol: TCP
          livenessProbe:
            httpGet:
              path: /ping
              port: http
          readinessProbe:
            initialDelaySeconds: 30
            httpGet:
              path: /ready
              port: http
          volumeMounts:
            - name: config
              mountPath: "/benthos.yaml"
              subPath: "benthos.yaml"
              readOnly: true
          env:
            - name: MACHINE
              value: "influx"
      volumes:
        - name: config
          configMap:
            name: benthos-influx-config

Read next

Share, Engage, and Contribute!

Discover how you can share your ideas, contribute to our blog, and connect with us on other platforms.