#include "dialogkafka.h" #include "ui_dialogkafka.h" #include DialogKafka * instance = nullptr; void uiprintf (const char * format,...); DialogKafka::DialogKafka(QWidget *parent) : QDialog(parent) , ui(new Ui::DialogKafka) , m_pMsgMod(new QStandardItemModel(this)) { ui->setupUi(this); instance = this; KafkaClient::cbprintf = uiprintf; ui->listView_msg->setModel(m_pMsgMod); connect(this,&DialogKafka::sig_msg,this,&DialogKafka::slot_msg,Qt::QueuedConnection); } DialogKafka::~DialogKafka() { delete ui; KafkaClient::cbprintf = csprintf; instance = nullptr; } void DialogKafka::on_pushButton_start_clicked() { if (consumer) { killTimer(m_nTimerID); m_nTimerID = -1; consumer->stop(); m_runthread->join(); delete consumer; consumer = nullptr; delete producer; producer = nullptr; delete m_runthread; m_runthread = nullptr; ui->pushButton_start->setText("Start"); } else { producer = new kafka_producer( ui->lineEdit_brokers->text().toStdString() ,ui->lineEdit_topic->text().toStdString() ); std::vector topics; topics.push_back(ui->lineEdit_topic->text().toStdString()); consumer = new kafka_consumer( ui->lineEdit_brokers->text().toStdString(), topics, ui->lineEdit_group->text().toStdString()); m_runthread = new std::thread([&]()->void{ consumer->run([&](rd_kafka_message_t * rkm)->void{ if (rkm) { /* Proper message. */ cbprintf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset); /* Print the message key. */ if (rkm->key ) cbprintf(" Key: %.*s\n", (int)rkm->key_len, (const char *)rkm->key); /* Print the message value/payload. */ if (rkm->payload) cbprintf(" Value: (%d bytes)\n", (int)rkm->len); } }); }); m_nTimerID = startTimer(500); ui->pushButton_start->setText("Stop"); } } void DialogKafka::shootMsg(QString msg) { emit sig_msg(msg); } void DialogKafka::slot_msg(QString m) { QString prefix = QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss") + ">"; QString line = prefix + m; m_pMsgMod->appendRow(new QStandardItem(line)); if (m_pMsgMod->rowCount()>128) m_pMsgMod->removeRows(0,m_pMsgMod->rowCount()-128); } void DialogKafka::timerEvent(QTimerEvent * evt) { if (evt->timerId()==m_nTimerID && producer) { char test[1024]; for (int i=0;i<1024;++i) test[i] = rand(); std::string k = ui->lineEdit_key->text().toStdString(); if (k.length()) producer->write(test,1024,10,k.c_str(),k.size()); else producer->write(test,1024); } } void uiprintf (const char * format,...) { va_list args; va_start(args, format); if (instance) { char buf[1024]; vsnprintf(buf,1024,format,args); buf[1023] = 0; instance->shootMsg(QString(buf).trimmed()); } else vfprintf(stderr,format, args); va_end(args); }