#include "kafka_client.h" #include namespace KafkaClient { /** * @brief Message delivery report callback. * * This callback is called exactly once per message, indicating if * the message was succesfully delivered * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR). * * The callback is triggered from rd_kafka_poll() and executes on * the application's thread. */ void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) cbprintf( "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else cbprintf( "%% Message delivered (%zd bytes, " "partition %" PRId32 ")\n", rkmessage->len, rkmessage->partition); /* The rkmessage is destroyed automatically by librdkafka */ } kafka_producer::kafka_producer(const std::string & sbrokers,const std::string & stopic) :m_topic(stopic) ,m_brokers(sbrokers) { init(); } kafka_producer::~kafka_producer() { exit(); } bool kafka_producer::init() { if (rk) return true; rd_kafka_conf_t *conf; /* Temporary configuration object */ char errstr[512]; /* librdkafka API error reporting buffer */ const char *brokers = m_brokers.c_str(); /* Argument: broker list */ /* * Create Kafka client configuration place-holder */ conf = rd_kafka_conf_new(); /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { cbprintf( "%s\n", errstr); rd_kafka_conf_destroy(conf); return false; } /* Set the delivery report callback. * This callback will be called once per message to inform * the application if delivery succeeded or failed. * See dr_msg_cb() above. * The callback is only triggered from rd_kafka_poll() and * rd_kafka_flush(). */ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); /* * Create producer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { cbprintf( "%% Failed to create new producer: %s\n", errstr); return false; } conf = NULL; /* Configuration object is now owned, and freed, * by the rd_kafka_t instance. */ return true; } bool kafka_producer::exit() { if (!rk) return true; /* Wait for final messages to be delivered or fail. * rd_kafka_flush() is an abstraction over rd_kafka_poll() which * waits for all messages to be delivered. */ cbprintf( "%% Flushing final messages..\n"); rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */); /* If the output queue is still not empty there is an issue * with producing messages to the clusters. */ if (rd_kafka_outq_len(rk) > 0) cbprintf( "%% %d message(s) were not delivered\n", rd_kafka_outq_len(rk)); /* Destroy the producer instance */ rd_kafka_destroy(rk); rk = nullptr; if (!rk) return true; return true; } bool kafka_producer::write( const char * data, const int len, const int maxTry /*=10*/, const char * key /*= nullptr*/, const int keylen /*= 0*/) { const char *topic = m_topic.c_str(); /* Argument: topic to produce to */ rd_kafka_resp_err_t err; if (len == 0) { /* Empty line: only serve delivery reports */ rd_kafka_poll(rk, 0 /*non-blocking */); return true; } /* * Send/Produce message. * This is an asynchronous call, on success it will only * enqueue the message on the internal producer queue. * The actual delivery attempts to the broker are handled * by background threads. * The previously registered delivery report callback * (dr_msg_cb) is used to signal back to the application * when the message has been delivered (or failed). */ int totalTry = 0; bool succeed = false; while(!succeed && totalTry++ < maxTry) { err = rd_kafka_producev( /* Producer handle */ rk, /* Topic name */ RD_KAFKA_V_TOPIC(topic), /* Make a copy of the payload. */ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), /* Message value and length */ RD_KAFKA_V_VALUE((void *)data, len), /* Per-Message opaque, provided in * delivery report callback as * msg_opaque. */ RD_KAFKA_V_KEY((void *)key,keylen), RD_KAFKA_V_OPAQUE(NULL), /* End sentinel */ RD_KAFKA_V_END); if (err) { /* * Failed to *enqueue* message for producing. */ cbprintf( "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err)); rd_kafka_poll(rk, 1000 /*block for max 1000ms*/); } else { cbprintf( "%% Enqueued message (%d bytes) " "for topic %s\n", (int)len, topic); succeed = true; rd_kafka_poll(rk, 0 /*non-blocking */); } } return succeed; } kafka_consumer::kafka_consumer( const std::string & brokers, const std::vector topics, const std::string & group) :m_stop(false) ,m_topics(topics) ,m_brokers(brokers) ,m_group(group) { init(); } kafka_consumer::~kafka_consumer() { exit(); } bool kafka_consumer::init() { char errstr[512]; /* librdkafka API error reporting buffer */ rd_kafka_resp_err_t err; /* librdkafka API error code */ /* * Create Kafka client configuration place-holder */ rd_kafka_conf_t *conf = rd_kafka_conf_new(); /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ if (rd_kafka_conf_set(conf, "bootstrap.servers", m_brokers.c_str() , errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { cbprintf( "%s\n", errstr); rd_kafka_conf_destroy(conf); return false; } /* Set the consumer group id. * All consumers sharing the same group id will join the same * group, and the subscribed topic' partitions will be assigned * according to the partition.assignment.strategy * (consumer config property) to the consumers in the group. */ if (rd_kafka_conf_set(conf, "group.id", m_group.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { cbprintf( "%s\n", errstr); rd_kafka_conf_destroy(conf); return false; } /* If there is no previously committed offset for a partition * the auto.offset.reset strategy will be used to decide where * in the partition to start fetching messages. * By setting this to earliest the consumer will read all messages * in the partition if there was no previously committed offset. */ if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { cbprintf( "%s\n", errstr); rd_kafka_conf_destroy(conf); return false; } /* * Create consumer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if (!rk) { cbprintf( "%% Failed to create new consumer: %s\n", errstr); return false; } conf = NULL; /* Configuration object is now owned, and freed, * by the rd_kafka_t instance. */ /* Redirect all messages from per-partition queues to * the main queue so that messages can be consumed with one * call from all assigned partitions. * * The alternative is to poll the main queue (for events) * and each partition queue separately, which requires setting * up a rebalance callback and keeping track of the assignment: * but that is more complex and typically not recommended. */ rd_kafka_poll_set_consumer(rk); int topic_cnt = m_topics.size(); rd_kafka_topic_partition_list_t *subscription = rd_kafka_topic_partition_list_new(topic_cnt);; /* Subscribed topics */ /* Convert the list of topics to a format suitable for librdkafka */ for (int i = 0; i < topic_cnt; i++) rd_kafka_topic_partition_list_add(subscription, m_topics[i].c_str(), /* the partition is ignored * by subscribe() */ RD_KAFKA_PARTITION_UA); /* Subscribe to the list of topics */ err = rd_kafka_subscribe(rk, subscription); if (err) { cbprintf( "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(subscription); rd_kafka_destroy(rk); rk = 0; return false; } cbprintf( "%% Subscribed to %d topic(s), " "waiting for rebalance and messages...\n", subscription->cnt); rd_kafka_topic_partition_list_destroy(subscription); return true; } bool kafka_consumer::exit() { if (!rk) return true; /* Close the consumer: commit final offsets and leave the group. */ cbprintf( "%% Closing consumer\n"); rd_kafka_consumer_close(rk); /* Destroy the consumer */ rd_kafka_destroy(rk); rk = 0; return true; } bool kafka_consumer::run(std::function cb ) { m_stop = true; if (!rk) return false; m_stop = false; while (!m_stop) { rd_kafka_message_t *rkm; rkm = rd_kafka_consumer_poll(rk, 100); if (!rkm) continue; /* Timeout: no message within 100ms, * try again. This short timeout allows * checking for `run` at frequent intervals. */ /* consumer_poll() will return either a proper message * or a consumer error (rkm->err is set). */ if (rkm->err) { /* Consumer errors are generally to be considered * informational as the consumer will automatically * try to recover from all types of errors. */ cbprintf( "%% Consumer error: %s\n", rd_kafka_message_errstr(rkm)); rd_kafka_message_destroy(rkm); continue; } if (cb) cb(rkm); rd_kafka_message_destroy(rkm); } cbprintf( "%% Consumer stopped\n"); return true; } bool kafka_consumer::stop() { m_stop = true; return true; } void csprintf (const char * format,...) { va_list args; va_start(args, format); vfprintf(stderr,format, args); va_end(args); } void (*cbprintf) (const char *,...) = csprintf; }