diff --git a/kafka/rdkafka_qt/.gitignore b/kafka/rdkafka_qt/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..fab7372d796ea95c80d02df6caa7eb2b411a7ac1 --- /dev/null +++ b/kafka/rdkafka_qt/.gitignore @@ -0,0 +1,73 @@ +# This file is used to ignore files which are generated +# ---------------------------------------------------------------------------- + +*~ +*.autosave +*.a +*.core +*.moc +*.o +*.obj +*.orig +*.rej +*.so +*.so.* +*_pch.h.cpp +*_resource.rc +*.qm +.#* +*.*# +core +!core/ +tags +.DS_Store +.directory +*.debug +Makefile* +*.prl +*.app +moc_*.cpp +ui_*.h +qrc_*.cpp +Thumbs.db +*.res +*.rc +/.qmake.cache +/.qmake.stash + +# qtcreator generated files +*.pro.user* + +# xemacs temporary files +*.flc + +# Vim temporary files +.*.swp + +# Visual Studio generated files +*.ib_pdb_index +*.idb +*.ilk +*.pdb +*.sln +*.suo +*.vcproj +*vcproj.*.*.user +*.ncb +*.sdf +*.opensdf +*.vcxproj +*vcxproj.* + +# MinGW generated files +*.Debug +*.Release + +# Python byte code +*.pyc + +# Binaries +# -------- +*.dll +*.exe + diff --git a/kafka/rdkafka_qt/dialogkafka.cpp b/kafka/rdkafka_qt/dialogkafka.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e8046850892532158b882178ed732e70e516cd38 --- /dev/null +++ b/kafka/rdkafka_qt/dialogkafka.cpp @@ -0,0 +1,134 @@ +#include "dialogkafka.h" +#include "ui_dialogkafka.h" +#include +DialogKafka * instance = nullptr; + +void uiprintf (const char * format,...); + +DialogKafka::DialogKafka(QWidget *parent) + : QDialog(parent) + , ui(new Ui::DialogKafka) + , m_pMsgMod(new QStandardItemModel(this)) +{ + ui->setupUi(this); + instance = this; + KafkaClient::cbprintf = uiprintf; + ui->listView_msg->setModel(m_pMsgMod); + connect(this,&DialogKafka::sig_msg,this,&DialogKafka::slot_msg,Qt::QueuedConnection); +} + +DialogKafka::~DialogKafka() +{ + delete ui; + KafkaClient::cbprintf = csprintf; + instance = nullptr; +} + + +void DialogKafka::on_pushButton_start_clicked() +{ + if (consumer) + { + killTimer(m_nTimerID); + m_nTimerID = -1; + + consumer->stop(); + m_runthread->join(); + + delete consumer; + consumer = nullptr; + + delete producer; + producer = nullptr; + + delete m_runthread; + m_runthread = nullptr; + + ui->pushButton_start->setText("Start"); + + } + else + { + producer = new kafka_producer( + ui->lineEdit_brokers->text().toStdString() + ,ui->lineEdit_topic->text().toStdString() + ); + + std::vector topics; + topics.push_back(ui->lineEdit_topic->text().toStdString()); + consumer = new kafka_consumer( + ui->lineEdit_brokers->text().toStdString(), + topics, + ui->lineEdit_group->text().toStdString()); + + m_runthread = new std::thread([&]()->void{ + consumer->run([&](rd_kafka_message_t * rkm)->void{ + if (rkm) + { + /* Proper message. */ + cbprintf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n", + rd_kafka_topic_name(rkm->rkt), rkm->partition, + rkm->offset); + + /* Print the message key. */ + if (rkm->key ) + cbprintf(" Key: %.*s\n", (int)rkm->key_len, + (const char *)rkm->key); + /* Print the message value/payload. */ + if (rkm->payload) + cbprintf(" Value: (%d bytes)\n", (int)rkm->len); + } + + + }); + }); + + m_nTimerID = startTimer(500); + ui->pushButton_start->setText("Stop"); + } +} + +void DialogKafka::shootMsg(QString msg) +{ + emit sig_msg(msg); +} + +void DialogKafka::slot_msg(QString m) +{ + QString prefix = QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss") + ">"; + QString line = prefix + m; + m_pMsgMod->appendRow(new QStandardItem(line)); + if (m_pMsgMod->rowCount()>128) + m_pMsgMod->removeRows(0,m_pMsgMod->rowCount()-128); +} + +void DialogKafka::timerEvent(QTimerEvent * evt) +{ + if (evt->timerId()==m_nTimerID && producer) + { + char test[1024]; + for (int i=0;i<1024;++i) + test[i] = rand(); + std::string k = ui->lineEdit_key->text().toStdString(); + if (k.length()) + producer->write(test,1024,10,k.c_str(),k.size()); + else + producer->write(test,1024); + } +} + +void uiprintf (const char * format,...) +{ + va_list args; + va_start(args, format); + if (instance) + { + char buf[1024]; + vsnprintf(buf,1024,format,args); + buf[1023] = 0; + instance->shootMsg(QString(buf).trimmed()); + } + else + vfprintf(stderr,format, args); + va_end(args); +} diff --git a/kafka/rdkafka_qt/dialogkafka.h b/kafka/rdkafka_qt/dialogkafka.h new file mode 100644 index 0000000000000000000000000000000000000000..153c9f523123fd7f13cf5881c9c3f2a7581be49a --- /dev/null +++ b/kafka/rdkafka_qt/dialogkafka.h @@ -0,0 +1,39 @@ +#ifndef DIALOGKAFKA_H +#define DIALOGKAFKA_H + +#include +#include +#include +#include "kafka_client.h" +QT_BEGIN_NAMESPACE +namespace Ui { class DialogKafka; } +QT_END_NAMESPACE + +using namespace KafkaClient; +class DialogKafka : public QDialog +{ + Q_OBJECT + +public: + DialogKafka(QWidget *parent = nullptr); + ~DialogKafka(); + //发送记录。由于可能来自多个线程,用信号周转 + void shootMsg(QString); +protected: + void timerEvent(QTimerEvent * evt) override; +private slots: + void on_pushButton_start_clicked(); + //消息槽 + void slot_msg(QString); +private: + Ui::DialogKafka *ui; + int m_nTimerID =-1; + QStandardItemModel * m_pMsgMod = nullptr; +protected: + kafka_producer * producer = nullptr; + kafka_consumer * consumer = nullptr; + std::thread * m_runthread = nullptr; +signals: + void sig_msg(QString); +}; +#endif // DIALOGKAFKA_H diff --git a/kafka/rdkafka_qt/dialogkafka.ui b/kafka/rdkafka_qt/dialogkafka.ui new file mode 100644 index 0000000000000000000000000000000000000000..0dd557f321e6fd54051a577f2ef87bea321d1af6 --- /dev/null +++ b/kafka/rdkafka_qt/dialogkafka.ui @@ -0,0 +1,96 @@ + + + DialogKafka + + + + 0 + 0 + 640 + 384 + + + + DialogKafka + + + + + + Settings + + + + + + brokers + + + + + + + 127.0.0.1:9092 + + + + + + + group + + + + + + + consumer + + + + + + + topic + + + + + + + test + + + + + + + key + + + + + + + + + + + + + + Start + + + + + + + + + + + + + + diff --git a/kafka/rdkafka_qt/kafka_client.cpp b/kafka/rdkafka_qt/kafka_client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c3d3fa9cd95035b291d5a6464a0fd973b4e02fb8 --- /dev/null +++ b/kafka/rdkafka_qt/kafka_client.cpp @@ -0,0 +1,370 @@ +#include "kafka_client.h" +#include +namespace KafkaClient { + /** + * @brief Message delivery report callback. + * + * This callback is called exactly once per message, indicating if + * the message was succesfully delivered + * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently + * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR). + * + * The callback is triggered from rd_kafka_poll() and executes on + * the application's thread. + */ + void + dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + if (rkmessage->err) + cbprintf( "%% Message delivery failed: %s\n", + rd_kafka_err2str(rkmessage->err)); + else + cbprintf( + "%% Message delivered (%zd bytes, " + "partition %" PRId32 ")\n", + rkmessage->len, rkmessage->partition); + + /* The rkmessage is destroyed automatically by librdkafka */ + } + + + + kafka_producer::kafka_producer(const std::string & sbrokers,const std::string & stopic) + :m_topic(stopic) + ,m_brokers(sbrokers) + { + init(); + } + kafka_producer::~kafka_producer() + { + exit(); + } + + bool kafka_producer::init() + { + if (rk) + return true; + rd_kafka_conf_t *conf; /* Temporary configuration object */ + char errstr[512]; /* librdkafka API error reporting buffer */ + const char *brokers = m_brokers.c_str(); /* Argument: broker list */ + /* + * Create Kafka client configuration place-holder + */ + conf = rd_kafka_conf_new(); + /* Set bootstrap broker(s) as a comma-separated list of + * host or host:port (default port 9092). + * librdkafka will use the bootstrap brokers to acquire the full + * set of brokers from the cluster. */ + if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + cbprintf( "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } + + /* Set the delivery report callback. + * This callback will be called once per message to inform + * the application if delivery succeeded or failed. + * See dr_msg_cb() above. + * The callback is only triggered from rd_kafka_poll() and + * rd_kafka_flush(). */ + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + + /* + * Create producer instance. + * + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (!rk) { + cbprintf( "%% Failed to create new producer: %s\n", + errstr); + return false; + } + conf = NULL; /* Configuration object is now owned, and freed, + * by the rd_kafka_t instance. */ + + return true; + + } + bool kafka_producer::exit() + { + if (!rk) + return true; + /* Wait for final messages to be delivered or fail. + * rd_kafka_flush() is an abstraction over rd_kafka_poll() which + * waits for all messages to be delivered. */ + cbprintf( "%% Flushing final messages..\n"); + rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */); + + /* If the output queue is still not empty there is an issue + * with producing messages to the clusters. */ + if (rd_kafka_outq_len(rk) > 0) + cbprintf( "%% %d message(s) were not delivered\n", + rd_kafka_outq_len(rk)); + + /* Destroy the producer instance */ + rd_kafka_destroy(rk); + + rk = nullptr; if (!rk) + return true; + + return true; + } + + bool kafka_producer::write( + const char * data, + const int len, + const int maxTry /*=10*/, + const char * key /*= nullptr*/, + const int keylen /*= 0*/) + { + const char *topic = m_topic.c_str(); /* Argument: topic to produce to */ + rd_kafka_resp_err_t err; + + if (len == 0) { + /* Empty line: only serve delivery reports */ + rd_kafka_poll(rk, 0 /*non-blocking */); + return true; + } + /* + * Send/Produce message. + * This is an asynchronous call, on success it will only + * enqueue the message on the internal producer queue. + * The actual delivery attempts to the broker are handled + * by background threads. + * The previously registered delivery report callback + * (dr_msg_cb) is used to signal back to the application + * when the message has been delivered (or failed). + */ + int totalTry = 0; + bool succeed = false; + while(!succeed && totalTry++ < maxTry) + { + err = rd_kafka_producev( + /* Producer handle */ + rk, + /* Topic name */ + RD_KAFKA_V_TOPIC(topic), + /* Make a copy of the payload. */ + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + /* Message value and length */ + RD_KAFKA_V_VALUE((void *)data, len), + /* Per-Message opaque, provided in + * delivery report callback as + * msg_opaque. */ + RD_KAFKA_V_KEY((void *)key,keylen), + RD_KAFKA_V_OPAQUE(NULL), + /* End sentinel */ + RD_KAFKA_V_END); + + if (err) + { + /* + * Failed to *enqueue* message for producing. + */ + cbprintf( + "%% Failed to produce to topic %s: %s\n", topic, + rd_kafka_err2str(err)); + rd_kafka_poll(rk, + 1000 /*block for max 1000ms*/); + } + else + { + cbprintf( + "%% Enqueued message (%d bytes) " + "for topic %s\n", + (int)len, topic); + succeed = true; + rd_kafka_poll(rk, 0 /*non-blocking */); + } + } + + return succeed; + } + + + kafka_consumer::kafka_consumer( + const std::string & brokers, + const std::vector topics, + const std::string & group) + :m_stop(false) + ,m_topics(topics) + ,m_brokers(brokers) + ,m_group(group) + + { + init(); + } + kafka_consumer::~kafka_consumer() + { + exit(); + } + bool kafka_consumer::init() + { + char errstr[512]; /* librdkafka API error reporting buffer */ + rd_kafka_resp_err_t err; /* librdkafka API error code */ + + /* + * Create Kafka client configuration place-holder + */ + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + /* Set bootstrap broker(s) as a comma-separated list of + * host or host:port (default port 9092). + * librdkafka will use the bootstrap brokers to acquire the full + * set of brokers from the cluster. */ + if (rd_kafka_conf_set(conf, "bootstrap.servers", m_brokers.c_str() + , errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + cbprintf( "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } + /* Set the consumer group id. + * All consumers sharing the same group id will join the same + * group, and the subscribed topic' partitions will be assigned + * according to the partition.assignment.strategy + * (consumer config property) to the consumers in the group. */ + if (rd_kafka_conf_set(conf, "group.id", m_group.c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + cbprintf( "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } + /* If there is no previously committed offset for a partition + * the auto.offset.reset strategy will be used to decide where + * in the partition to start fetching messages. + * By setting this to earliest the consumer will read all messages + * in the partition if there was no previously committed offset. */ + if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + cbprintf( "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } + /* + * Create consumer instance. + * + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) { + cbprintf( "%% Failed to create new consumer: %s\n", + errstr); + return false; + } + conf = NULL; /* Configuration object is now owned, and freed, + * by the rd_kafka_t instance. */ + /* Redirect all messages from per-partition queues to + * the main queue so that messages can be consumed with one + * call from all assigned partitions. + * + * The alternative is to poll the main queue (for events) + * and each partition queue separately, which requires setting + * up a rebalance callback and keeping track of the assignment: + * but that is more complex and typically not recommended. */ + rd_kafka_poll_set_consumer(rk); + + int topic_cnt = m_topics.size(); + + rd_kafka_topic_partition_list_t *subscription + = rd_kafka_topic_partition_list_new(topic_cnt);; /* Subscribed topics */ + /* Convert the list of topics to a format suitable for librdkafka */ + for (int i = 0; i < topic_cnt; i++) + rd_kafka_topic_partition_list_add(subscription, m_topics[i].c_str(), + /* the partition is ignored + * by subscribe() */ + RD_KAFKA_PARTITION_UA); + + /* Subscribe to the list of topics */ + err = rd_kafka_subscribe(rk, subscription); + if (err) { + cbprintf( "%% Failed to subscribe to %d topics: %s\n", + subscription->cnt, rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(subscription); + rd_kafka_destroy(rk); + rk = 0; + return false; + } + + cbprintf( + "%% Subscribed to %d topic(s), " + "waiting for rebalance and messages...\n", + subscription->cnt); + + rd_kafka_topic_partition_list_destroy(subscription); + + return true; + } + bool kafka_consumer::exit() + { + if (!rk) + return true; + /* Close the consumer: commit final offsets and leave the group. */ + cbprintf( "%% Closing consumer\n"); + rd_kafka_consumer_close(rk); + + + /* Destroy the consumer */ + rd_kafka_destroy(rk); + + rk = 0; + + return true; + } + bool kafka_consumer::run(std::function cb ) + { + m_stop = true; + if (!rk) + return false; + m_stop = false; + while (!m_stop) { + rd_kafka_message_t *rkm; + + rkm = rd_kafka_consumer_poll(rk, 100); + if (!rkm) + continue; /* Timeout: no message within 100ms, + * try again. This short timeout allows + * checking for `run` at frequent intervals. + */ + + /* consumer_poll() will return either a proper message + * or a consumer error (rkm->err is set). */ + if (rkm->err) { + /* Consumer errors are generally to be considered + * informational as the consumer will automatically + * try to recover from all types of errors. */ + cbprintf( "%% Consumer error: %s\n", + rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + continue; + } + if (cb) + cb(rkm); + + rd_kafka_message_destroy(rkm); + } + cbprintf( "%% Consumer stopped\n"); + return true; + } + + bool kafka_consumer::stop() + { + m_stop = true; + return true; + } + + + void csprintf (const char * format,...) + { + va_list args; + va_start(args, format); + vfprintf(stderr,format, args); + va_end(args); + } + void (*cbprintf) (const char *,...) = csprintf; + +} diff --git a/kafka/rdkafka_qt/kafka_client.h b/kafka/rdkafka_qt/kafka_client.h new file mode 100644 index 0000000000000000000000000000000000000000..31c82c0e5db29560b7da3acc7988cf1769199623 --- /dev/null +++ b/kafka/rdkafka_qt/kafka_client.h @@ -0,0 +1,61 @@ +#ifndef COMMON_EXAMPLE_KAFKA_H +#define COMMON_EXAMPLE_KAFKA_H +#include +#include +#include +#include +#include + +namespace KafkaClient { + + class kafka_producer + { + public: + explicit kafka_producer( + const std::string & brokers, + const std::string & topic); + virtual ~kafka_producer(); + public: + bool write( + const char * data, + const int len, + const int maxTry = 10, + const char * key = nullptr, + const int keylen = 0); + protected: + bool init(); + bool exit(); + protected: + rd_kafka_t *rk = nullptr; /* Producer instance handle */ + std::string m_topic; + std::string m_brokers; + + }; + + class kafka_consumer{ + public: + explicit kafka_consumer( + const std::string & brokers, + const std::vector topics, + const std::string & group); + virtual ~kafka_consumer(); + protected: + bool init(); + bool exit(); + public: + bool stop(); + bool run(std::function cb ); + protected: + rd_kafka_t *rk = nullptr; /* Producer instance handle */ + std::atomic m_stop; + std::vector m_topics; + std::string m_brokers; + std::string m_group; + }; + + //Default printf Callback + extern void (*cbprintf) (const char *,...); + void csprintf (const char * format,...); +} + +#endif // COMMON_EXAMPLE_KAFKA_H diff --git a/kafka/rdkafka_qt/main.cpp b/kafka/rdkafka_qt/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3194a044ac989587756ab6a5d1bd0b73f1571400 --- /dev/null +++ b/kafka/rdkafka_qt/main.cpp @@ -0,0 +1,11 @@ +#include "dialogkafka.h" + +#include + +int main(int argc, char *argv[]) +{ + QApplication a(argc, argv); + DialogKafka w; + w.show(); + return a.exec(); +} diff --git a/kafka/rdkafka_qt/rdkafka_qt.pro b/kafka/rdkafka_qt/rdkafka_qt.pro new file mode 100644 index 0000000000000000000000000000000000000000..728b5cfcab0cb671a1c2b3d462d2b5223f2ac46a --- /dev/null +++ b/kafka/rdkafka_qt/rdkafka_qt.pro @@ -0,0 +1,24 @@ +QT += core gui + +greaterThan(QT_MAJOR_VERSION, 4): QT += widgets + +CONFIG += c++11 + +# You can make your code fail to compile if it uses deprecated APIs. +# In order to do so, uncomment the following line. +#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000 # disables all the APIs deprecated before Qt 6.0.0 + +SOURCES += \ + kafka_client.cpp \ + main.cpp \ + dialogkafka.cpp + +HEADERS += \ + dialogkafka.h \ + kafka_client.h + +FORMS += \ + dialogkafka.ui + +win32:LIBS+=-lrdkafka.dll +linux:LIBS+=-lrdkafka diff --git a/kafka/rdkafka_test/common_example_kafka.h b/kafka/rdkafka_test/common_example_kafka.h index 661c1b43dbd2b210f9aad05558fa066aefa8c5c1..0755d8887b8f9856ef81aa79dcee84eaf0f3e3eb 100644 --- a/kafka/rdkafka_test/common_example_kafka.h +++ b/kafka/rdkafka_test/common_example_kafka.h @@ -5,8 +5,7 @@ extern std::atomic run; void stop(int /*sig*/); int is_printable(const char *buf, size_t size); -void -dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque); +void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque); int main_consumer(int argc, char **argv) ; int main_producer(int argc, char **argv); #endif // COMMON_EXAMPLE_KAFKA_H diff --git a/kafka/rdkafka_test/main.cpp b/kafka/rdkafka_test/main.cpp index fb1747dfd6b65bac8623d284b679ce1a2a4a8858..244ca35323e00cc8c2c81e58466569390a9c01c6 100644 --- a/kafka/rdkafka_test/main.cpp +++ b/kafka/rdkafka_test/main.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "common_example_kafka.h" using namespace std; diff --git a/kafka/rdkafka_test/rdkafka_test.pro b/kafka/rdkafka_test/rdkafka_test.pro index 0c8c1aef10f9aad2a3bd004397732ca2d22ba351..659c553d5bad897b2bdb3a90c848f9401f4f50e9 100644 --- a/kafka/rdkafka_test/rdkafka_test.pro +++ b/kafka/rdkafka_test/rdkafka_test.pro @@ -9,5 +9,5 @@ SOURCES += \ HEADERS += \ common_example_kafka.h - -LIBS+=-lrdkafka.dll +win32:LIBS+=-lrdkafka.dll +linux:LIBS+=-lrdkafka