Prerequisites
- Access to your cluster via OpenLens
Steps
- Connect to your cluster using OpenLens.
- Open a Create Resource tab located at the bottom left.
- Paste in the YAML below and adjust the following fields under output:
- Endpoint: your influxDB Endpoint
- Organization: your influxDB Oganization
- Bucket: your influxDB Bucket
- Token: your influxDB API-Token
- Configure your topics which benthos should listen to. In this example we are using the IoT-simulator.
- Depending on your topic choice, you might need to adjust the root assignment, to match your specified topic. The
meta("kafka_topic").string().split(".").<X>
splits up the topic into a list, e.g ìa.factoryinsight.x
into [ia, factoryinsight, x]. You can now choose the correct key, by replacing the X with a number, beginning from 0.
- Finally click on create and a new benthos pod will spawn.
💡
InfluxDB uses a unique message structure, so make sure to adjust the root field to your desired message.
apiVersion: v1
kind: ConfigMap
metadata:
name: benthos-influx-config
namespace: united-manufacturing-hub
labels:
app: benthos-influx
data:
benthos.yaml: |-
input:
kafka_franz:
seed_brokers:
- united-manufacturing-hub-kafka:9092
topics:
- ia.raw.development.ioTSensors.*
regexp_topics: true
consumer_group: "benthos-1"
pipeline:
processors:
- bloblang: |
let value = content().number().string()
root = meta("kafka_topic").string().split(".").2 + ",sensor=" + meta("kafka_topic").string().split(".").3 + " " + meta("kafka_topic").string().split(".").4 + "=" + $value + " "
output:
type: http_client
http_client:
url: http://<Endpoint>:8086/api/v2/write?org=<Organisation>&bucket=<Bucket>&precision=ms
verb: POST
headers:
Authorization: Token <Token>
Content-Type: text/plain
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: benthos-influx-deployment
namespace: united-manufacturing-hub
labels:
app: benthos-influx
spec:
replicas: 1
selector:
matchLabels:
app: benthos-influx
template:
metadata:
labels:
app: benthos-influx
spec:
containers:
- name: benthos-influx
image: ghcr.io/united-manufacturing-hub/benthos-umh:latest
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 4195
protocol: TCP
livenessProbe:
httpGet:
path: /ping
port: http
readinessProbe:
initialDelaySeconds: 30
httpGet:
path: /ready
port: http
volumeMounts:
- name: config
mountPath: "/benthos.yaml"
subPath: "benthos.yaml"
readOnly: true
env:
- name: MACHINE
value: "influx"
volumes:
- name: config
configMap:
name: benthos-influx-config