hongbo-miao/hongbomiao.com

View on GitHub
kubernetes/manifests/hm-kafka/iot-kafka-connect/hm-motor-jdbc-sink-kafka-connector.yaml

Summary

Maintainability
Test Coverage
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: hm-motor-jdbc-sink-kafka-connector
  namespace: hm-kafka
  labels:
    strimzi.io/cluster: hm-kafka-iot-kafka-connect
spec:
  class: io.confluent.connect.jdbc.JdbcSinkConnector
  tasksMax: 8
  # https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html
  config:
    connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
    tasks.max: 8
    topics: hm.motor.avro
    connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/production_hm_iot_db
    connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
    connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
    insert.mode: insert
    batch.size: 100000

    # table
    table.name.format: motor

    # primary key
    pk.mode: record_value
    pk.fields: timestamp

    # value
    # 1) JSON
    # value.converter: org.apache.kafka.connect.json.JsonConverter
    # value.converter.schemas.enable: true

    # 2) Avro
    # https://www.apicur.io/registry/docs/apicurio-registry/2.4.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-config-props_registry
    # value.converter: io.apicurio.registry.utils.converter.AvroConverter
    # value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
    # value.converter.apicurio.registry.fallback.group-id: hm-group
    # value.converter.apicurio.registry.fallback.artifact-id: hm.motor.avro-value

    # 3) Confluent Avro
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081

    # timestamp
    transforms: convertTimestamp
    transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.convertTimestamp.field: timestamp
    transforms.convertTimestamp.target.type: Timestamp

    # auto.create: true
    # auto.evolve: true