hongbo-miao/hongbomiao.com

View on GitHub
hm-kafka/kafka-client/kafka-c/json-producer/src/main.c

Summary

Maintainability
Test Coverage
#include <cjson/cJSON.h>
#include <glib.h>
#include <librdkafka/rdkafka.h>
#include <signal.h>
#include <stdlib.h>

#include "../include/app.h"
#include "../include/config.h"
#include "../include/kafka.h"

int main(int argc, char **argv) {
  const char *topic = "production.iot.device.json";

  rd_kafka_t *producer;
  rd_kafka_conf_t *conf;
  char err_str[4096];

  if (argc != 2) {
    g_error("Usage: %s <config.ini>", argv[0]);
  }

  const char *config_file = argv[1];
  g_autoptr(GError) err = NULL;
  g_autoptr(GKeyFile) key_file = g_key_file_new();
  if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE, &err)) {
    g_error("Error loading config file: %s", err->message);
  }

  conf = rd_kafka_conf_new();
  load_config_group(conf, key_file, "default");
  rd_kafka_conf_set_dr_msg_cb(conf, delivery_report);
  producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, err_str, sizeof(err_str));
  if (!producer) {
    g_error("Failed to create new producer: %s", err_str);
  }
  conf = NULL;

  signal(SIGINT, handle_signal);
  signal(SIGTERM, handle_signal);

  const char *device_ids[5] = {"device1", "device2", "device3", "device4", "device5"};
  const char *status_list[3] = {"online", "offline", "maintenance"};
  const char *locations[3] = {"locationA", "locationB", "locationC"};
  const char *types[3] = {"type1", "type2", "type3"};

  while (is_running) {
    const char *kafka_key = device_ids[random() % G_N_ELEMENTS(device_ids)];

    const char *status = status_list[random() % G_N_ELEMENTS(status_list)];
    const char *location = locations[random() % G_N_ELEMENTS(locations)];
    const char *type = types[random() % G_N_ELEMENTS(types)];
    double temperature = ((double)random() / RAND_MAX) * 100.0 - 50.0;  // [-50.0, 50.0]
    double humidity = (random() % 101) / 100.0;                         // [0.0, 1.0]
    int battery = random() % 101;                                       // [0, 100]
    int signal_strength = random() % 101;                               // [0, 100]
    const char *mode = (random() % 2) ? "manual" : "auto";
    bool active = (random() % 2) ? true : false;

    // Create JSON object
    cJSON *json_obj = cJSON_CreateObject();
    if (!json_obj) {
      g_error("Failed to create JSON object");
    }
    cJSON_AddStringToObject(json_obj, "status", status);
    cJSON_AddStringToObject(json_obj, "location", location);
    cJSON_AddStringToObject(json_obj, "type", type);
    cJSON_AddNumberToObject(json_obj, "temperature", temperature);
    cJSON_AddNumberToObject(json_obj, "humidity", humidity);
    cJSON_AddNumberToObject(json_obj, "battery", battery);
    cJSON_AddNumberToObject(json_obj, "signal_strength", signal_strength);
    cJSON_AddStringToObject(json_obj, "mode", mode);
    cJSON_AddBoolToObject(json_obj, "active", active);

    // Convert JSON object to string
    const char *json_str = cJSON_PrintUnformatted(json_obj);
    if (!json_str) {
      g_error("Failed to print JSON object\n");
    }

    size_t key_len = strlen(kafka_key);
    rd_kafka_resp_err_t rd_kafka_resp_err;
    rd_kafka_resp_err =
      rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                        RD_KAFKA_V_KEY((void *)kafka_key, key_len),
                        RD_KAFKA_V_VALUE((void *)json_str, strlen(json_str)),
                        RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);

    if (rd_kafka_resp_err) {
      g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(rd_kafka_resp_err));
    }

    cJSON_Delete(json_obj);
    free((void *)json_str);

    rd_kafka_poll(producer, 0);
    g_usleep(50);  // μs
  }

  g_message("Flushing final messages ...");
  rd_kafka_flush(producer, 10 * 1000);

  if (rd_kafka_outq_len(producer) > 0) {
    g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
  }

  g_message("Producer stopped.");
  rd_kafka_destroy(producer);
  exit(EXIT_SUCCESS);
}