kubernetes/manifests/hm-kafka/iot-kafka-connect/hm-motor-jdbc-sink-kafka-connector.yaml
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: hm-motor-jdbc-sink-kafka-connector
namespace: hm-kafka
labels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
spec:
class: io.confluent.connect.jdbc.JdbcSinkConnector
tasksMax: 8
# https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html
config:
connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max: 8
topics: hm.motor.avro
connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/production_hm_iot_db
connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
insert.mode: insert
batch.size: 100000
# table
table.name.format: motor
# primary key
pk.mode: record_value
pk.fields: timestamp
# value
# 1) JSON
# value.converter: org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable: true
# 2) Avro
# https://www.apicur.io/registry/docs/apicurio-registry/2.4.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-config-props_registry
# value.converter: io.apicurio.registry.utils.converter.AvroConverter
# value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
# value.converter.apicurio.registry.fallback.group-id: hm-group
# value.converter.apicurio.registry.fallback.artifact-id: hm.motor.avro-value
# 3) Confluent Avro
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081
# timestamp
transforms: convertTimestamp
transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convertTimestamp.field: timestamp
transforms.convertTimestamp.target.type: Timestamp
# auto.create: true
# auto.evolve: true