hongbo-miao/hongbomiao.com

View on GitHub
data-processing/hm-spark/applications/ingest-from-s3-to-kafka/src/main/scala/com/hongbomiao/IngestFromS3ToKafka.scala

Summary

Maintainability
A
1 hr
Test Coverage
package com.hongbomiao

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.LongType
import za.co.absa.abris.avro.functions.to_avro
import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog"
      )
      .getOrCreate()

    val motorValueSchemaConfig: ToAvroConfig =
      AbrisConfig.toConfluentAvro.downloadSchemaByLatestVersion
        .andTopicNameStrategy("hm.motor.avro")
        .usingSchemaRegistry(
          "http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081"
        )

    val df = spark.readStream
      .format("delta")
      .load("s3a://hm-production-bucket/delta-tables/motor_data")
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), motorValueSchemaConfig).as("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor.avro")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}