提交 e61626bf 编写于 作者: M manjaro-xfce

测试Kafka的seek功能

上级 d1540e54
......@@ -60,6 +60,18 @@ namespace KafkaClient {
rd_kafka_conf_destroy(conf);
return false;
}
if (rd_kafka_conf_set(conf, "compression.type", "zstd", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
cbprintf( "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return false;
}
if (rd_kafka_conf_set(conf, "compression.level", "9", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
cbprintf( "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return false;
}
/* Set the delivery report callback.
* This callback will be called once per message to inform
......
......@@ -37,99 +37,123 @@
#include <string.h>
#include <ctype.h>
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "common_example_kafka.h"
void myrebalance(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque)
{
//first place can seek a partion, when using rd_kafka_subscribe
for (int i=0;i<partitions->cnt;++i)
{
int64_t low, high;
err = rd_kafka_query_watermark_offsets(rk,partitions->elems[i].topic, partitions->elems[i].partition, &low, &high, 5000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to query watermark offsets: %s\n", rd_kafka_err2str(err));
} else {
printf("Partition 0 offset range: [%ld, %ld]\n", low, high);
}
partitions->elems[i].offset = (low + high)/2;
}
auto e = rd_kafka_assign(rk,partitions);
puts(rd_kafka_err2str(e));
}
int main_consumer(int argc, char **argv) {
rd_kafka_t *rk; /* Consumer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
rd_kafka_resp_err_t err; /* librdkafka API error code */
char errstr[512]; /* librdkafka API error reporting buffer */
const char *brokers; /* Argument: broker list */
const char *groupid; /* Argument: Consumer group id */
char **topics; /* Argument: list of topics to subscribe to */
int topic_cnt; /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
int i;
/*
rd_kafka_t *rk; /* Consumer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
rd_kafka_resp_err_t err; /* librdkafka API error code */
char errstr[512]; /* librdkafka API error reporting buffer */
const char *brokers; /* Argument: broker list */
const char *groupid; /* Argument: Consumer group id */
char **topics; /* Argument: list of topics to subscribe to */
int topic_cnt; /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
int i;
/*
* Argument validation
*/
if (argc < 4) {
fprintf(stderr,
"%% Usage: "
"%s <broker> <group.id> <topic1> <topic2>..\n",
argv[0]);
return 1;
}
if (argc < 4) {
fprintf(stderr,
"%% Usage: "
"%s <broker> <group.id> <topic1> <topic2>..\n",
argv[0]);
return 1;
}
brokers = argv[1];
groupid = argv[2];
topics = &argv[3];
topic_cnt = argc - 3;
brokers = argv[1];
groupid = argv[2];
topics = &argv[3];
topic_cnt = argc - 3;
/*
/*
// auto e = rd_kafka_assign(rk,subscription);
// puts(rd_kafka_err2str(e));
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();
conf = rd_kafka_conf_new();
/* Set bootstrap broker(s) as a comma-separated list of
/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* Set the consumer group id.
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* Set the consumer group id.
* All consumers sharing the same group id will join the same
* group, and the subscribed topic' partitions will be assigned
* according to the partition.assignment.strategy
* (consumer config property) to the consumers in the group. */
if (rd_kafka_conf_set(conf, "group.id", groupid, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* If there is no previously committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
if (rd_kafka_conf_set(conf, "group.id", groupid, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* If there is no previously committed offset for a partition
* the auto.offset.reset straterd_kafka_conf_set_rebalance_cbgy will be used to decide where
* in the partition to start fetching messages.
* By setting this to earliest the consumer will read all messages
* in the partition if there was no previously committed offset. */
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/*
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
rd_kafka_conf_set_rebalance_cb(conf,myrebalance);
/*
* Create consumer instance.
*
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create new consumer: %s\n",
errstr);
return 1;
}
conf = NULL; /* Configuration object is now owned, and freed,
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create new consumer: %s\n",
errstr);
return 1;
}
conf = NULL; /* Configuration object is now owned, and freed,
* by the rd_kafka_t instance. */
/* Redirect all messages from per-partition queues to
/* Redirect all messages from per-partition queues to
* the main queue so that messages can be consumed with one
* call from all assigned partitions.
*
......@@ -137,96 +161,116 @@ int main_consumer(int argc, char **argv) {
* and each partition queue separately, which requires setting
* up a rebalance callback and keeping track of the assignment:
* but that is more complex and typically not recommended. */
rd_kafka_poll_set_consumer(rk);
/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
rd_kafka_topic_partition_list_add(subscription, topics[i],
/* the partition is ignored
* by subscribe() */
RD_KAFKA_PARTITION_UA);
/* Subscribe to the list of topics */
err = rd_kafka_subscribe(rk, subscription);
if (err) {
fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(rk);
return 1;
}
rd_kafka_poll_set_consumer(rk);
/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
{
rd_kafka_topic_partition_list_add(subscription, topics[i],
/* the partition is ignored by subscribe() */
RD_KAFKA_PARTITION_UA);
// /*second place can seek a partion. query water mark and seek*/
// int64_t low, high;
// err = rd_kafka_query_watermark_offsets(rk,topics[i], 0, &low, &high, 5000);
// if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
// fprintf(stderr, "Failed to query watermark offsets: %s\n", rd_kafka_err2str(err));
// } else {
// printf("Partition 0 offset range: [%ld, %ld]\n", low, high);
// }
// auto rkv = rd_kafka_topic_partition_list_add(subscription, topics[i],
// /* the partition must be specfied when you want to seek */
// 0);
// rkv->offset = (low + high)/2;
}
/** Subscribe to the list of topics
* If you want to initially seek a partion, using rd_kafka_assign
* however, rd_kafka_assign can not recieve rebalanced new topics, eg, mathced by regexp.
*/
err = rd_kafka_subscribe(rk, subscription);
//err = rd_kafka_assign(rk, subscription);
if (err) {
fprintf(stderr, "%% Failed to subscribe/assign to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(rk);
return 1;
}
fprintf(stderr,
"%% Subscribed to %d topic(s), "
"waiting for rebalance and messages...\n",
subscription->cnt);
fprintf(stderr,
"%% Subscribed to %d topic(s), "
"waiting for rebalance and messages...\n",
subscription->cnt);
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_topic_partition_list_destroy(subscription);
subscription = 0;
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
/* Subscribing to topics will trigger a group rebalance
/* Subscribing to topics will trigger a group rebalance
* which may take some time to finish, but there is no need
* for the application to handle this idle period in a special way
* since a rebalance may happen at any time.
* Start polling for messages. */
while (run) {
rd_kafka_message_t *rkm;
while (run) {
rd_kafka_message_t *rkm;
rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm)
continue; /* Timeout: no message within 100ms,
rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm)
continue; /* Timeout: no message within 100ms,
* try again. This short timeout allows
* checking for `run` at frequent intervals.
*/
//third place can seek a partion.
//rd_kafka_seek(rkm->rkt,rkm->partition,0,100);
/* consumer_poll() will return either a proper message
/* consumer_poll() will return either a proper message
* or a consumer error (rkm->err is set). */
if (rkm->err) {
/* Consumer errors are generally to be considered
if (rkm->err) {
/* Consumer errors are generally to be considered
* informational as the consumer will automatically
* try to recover from all types of errors. */
fprintf(stderr, "%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}
/* Proper message. */
printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);
/* Print the message key. */
if (rkm->key && is_printable((const char *)rkm->key, rkm->key_len))
printf(" Key: %.*s\n", (int)rkm->key_len,
(const char *)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);
/* Print the message value/payload. */
if (rkm->payload && is_printable((const char *)rkm->payload, rkm->len))
printf(" Value: %.*s\n", (int)rkm->len,
(const char *)rkm->payload);
else if (rkm->payload)
printf(" Value: (%d bytes)\n", (int)rkm->len);
rd_kafka_message_destroy(rkm);
fprintf(stderr, "%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}
/* Proper message. */
printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);
/* Close the consumer: commit final offsets and leave the group. */
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(rk);
/* Print the message key. */
if (rkm->key && is_printable((const char *)rkm->key, rkm->key_len))
printf(" Key: %.*s\n", (int)rkm->key_len,
(const char *)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);
/* Print the message value/payload. */
if (rkm->payload && is_printable((const char *)rkm->payload, rkm->len))
printf(" Value: %.*s\n", (int)rkm->len,
(const char *)rkm->payload);
else if (rkm->payload)
printf(" Value: (%d bytes)\n", (int)rkm->len);
/* Destroy the consumer */
rd_kafka_destroy(rk);
rd_kafka_message_destroy(rkm);
}
/* Close the consumer: commit final offsets and leave the group. */
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(rk);
/* Destroy the consumer */
rd_kafka_destroy(rk);
return 0;
return 0;
}
......@@ -24,6 +24,13 @@ int main(int argc, char ** argv)
else
{
printf ("You must raname your exe file to consumer or producer.\n");
printf ("please choose: 0=exit,1=consumer,2=producer:\n");
char c = getchar();
if (c=='1')
main_consumer(argc,argv);
else if (c=='2')
main_producer(argc,argv);
}
return 0;
}
......
......@@ -78,6 +78,19 @@ int main_producer(int argc, char **argv) {
return 1;
}
if (rd_kafka_conf_set(conf, "compression.type", "zstd", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return false;
}
if (rd_kafka_conf_set(conf, "compression.level", "9", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return false;
}
/* Set the delivery report callback.
* This callback will be called once per message to inform
* the application if delivery succeeded or failed.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册