diff --git a/kafka/rdkafka_qt/kafka_client.cpp b/kafka/rdkafka_qt/kafka_client.cpp index c3d3fa9cd95035b291d5a6464a0fd973b4e02fb8..acc92b747487c3f1fc8e21c13f9043ffa05a94ab 100644 --- a/kafka/rdkafka_qt/kafka_client.cpp +++ b/kafka/rdkafka_qt/kafka_client.cpp @@ -60,6 +60,18 @@ namespace KafkaClient { rd_kafka_conf_destroy(conf); return false; } + if (rd_kafka_conf_set(conf, "compression.type", "zstd", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + cbprintf( "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } + if (rd_kafka_conf_set(conf, "compression.level", "9", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + cbprintf( "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } /* Set the delivery report callback. * This callback will be called once per message to inform diff --git a/kafka/rdkafka_test/consumer.cpp b/kafka/rdkafka_test/consumer.cpp index 65e1ce1f0236a9e4b96e3ae5c406a8a16993a2df..2b1a9fb43c4784a3416998fb152fb035f6cdaa6e 100644 --- a/kafka/rdkafka_test/consumer.cpp +++ b/kafka/rdkafka_test/consumer.cpp @@ -37,99 +37,123 @@ #include #include - /* Typical include path would be , but this program * is builtin from within the librdkafka source tree and thus differs. */ #include "common_example_kafka.h" + +void myrebalance(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *partitions, + void *opaque) +{ + //first place can seek a partion, when using rd_kafka_subscribe + for (int i=0;icnt;++i) + { + int64_t low, high; + err = rd_kafka_query_watermark_offsets(rk,partitions->elems[i].topic, partitions->elems[i].partition, &low, &high, 5000); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + fprintf(stderr, "Failed to query watermark offsets: %s\n", rd_kafka_err2str(err)); + } else { + printf("Partition 0 offset range: [%ld, %ld]\n", low, high); + } + partitions->elems[i].offset = (low + high)/2; + } + auto e = rd_kafka_assign(rk,partitions); + puts(rd_kafka_err2str(e)); +} + + int main_consumer(int argc, char **argv) { - rd_kafka_t *rk; /* Consumer instance handle */ - rd_kafka_conf_t *conf; /* Temporary configuration object */ - rd_kafka_resp_err_t err; /* librdkafka API error code */ - char errstr[512]; /* librdkafka API error reporting buffer */ - const char *brokers; /* Argument: broker list */ - const char *groupid; /* Argument: Consumer group id */ - char **topics; /* Argument: list of topics to subscribe to */ - int topic_cnt; /* Number of topics to subscribe to */ - rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */ - int i; - - /* + rd_kafka_t *rk; /* Consumer instance handle */ + rd_kafka_conf_t *conf; /* Temporary configuration object */ + rd_kafka_resp_err_t err; /* librdkafka API error code */ + char errstr[512]; /* librdkafka API error reporting buffer */ + const char *brokers; /* Argument: broker list */ + const char *groupid; /* Argument: Consumer group id */ + char **topics; /* Argument: list of topics to subscribe to */ + int topic_cnt; /* Number of topics to subscribe to */ + rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */ + int i; + + /* * Argument validation */ - if (argc < 4) { - fprintf(stderr, - "%% Usage: " - "%s ..\n", - argv[0]); - return 1; - } + if (argc < 4) { + fprintf(stderr, + "%% Usage: " + "%s ..\n", + argv[0]); + return 1; + } - brokers = argv[1]; - groupid = argv[2]; - topics = &argv[3]; - topic_cnt = argc - 3; + brokers = argv[1]; + groupid = argv[2]; + topics = &argv[3]; + topic_cnt = argc - 3; - /* + /* +// auto e = rd_kafka_assign(rk,subscription); +// puts(rd_kafka_err2str(e)); * Create Kafka client configuration place-holder */ - conf = rd_kafka_conf_new(); + conf = rd_kafka_conf_new(); - /* Set bootstrap broker(s) as a comma-separated list of + /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ - if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - - /* Set the consumer group id. + if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* Set the consumer group id. * All consumers sharing the same group id will join the same * group, and the subscribed topic' partitions will be assigned * according to the partition.assignment.strategy * (consumer config property) to the consumers in the group. */ - if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - - /* If there is no previously committed offset for a partition - * the auto.offset.reset strategy will be used to decide where + if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* If there is no previously committed offset for a partition + * the auto.offset.reset straterd_kafka_conf_set_rebalance_cbgy will be used to decide where * in the partition to start fetching messages. * By setting this to earliest the consumer will read all messages * in the partition if there was no previously committed offset. */ - if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - - /* + if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + rd_kafka_conf_set_rebalance_cb(conf,myrebalance); + /* * Create consumer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ - rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); - if (!rk) { - fprintf(stderr, "%% Failed to create new consumer: %s\n", - errstr); - return 1; - } - - conf = NULL; /* Configuration object is now owned, and freed, + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) { + fprintf(stderr, "%% Failed to create new consumer: %s\n", + errstr); + return 1; + } + + conf = NULL; /* Configuration object is now owned, and freed, * by the rd_kafka_t instance. */ - - /* Redirect all messages from per-partition queues to + /* Redirect all messages from per-partition queues to * the main queue so that messages can be consumed with one * call from all assigned partitions. * @@ -137,96 +161,116 @@ int main_consumer(int argc, char **argv) { * and each partition queue separately, which requires setting * up a rebalance callback and keeping track of the assignment: * but that is more complex and typically not recommended. */ - rd_kafka_poll_set_consumer(rk); - - - /* Convert the list of topics to a format suitable for librdkafka */ - subscription = rd_kafka_topic_partition_list_new(topic_cnt); - for (i = 0; i < topic_cnt; i++) - rd_kafka_topic_partition_list_add(subscription, topics[i], - /* the partition is ignored - * by subscribe() */ - RD_KAFKA_PARTITION_UA); - - /* Subscribe to the list of topics */ - err = rd_kafka_subscribe(rk, subscription); - if (err) { - fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", - subscription->cnt, rd_kafka_err2str(err)); - rd_kafka_topic_partition_list_destroy(subscription); - rd_kafka_destroy(rk); - return 1; - } + rd_kafka_poll_set_consumer(rk); + + + /* Convert the list of topics to a format suitable for librdkafka */ + subscription = rd_kafka_topic_partition_list_new(topic_cnt); + for (i = 0; i < topic_cnt; i++) + { + rd_kafka_topic_partition_list_add(subscription, topics[i], + /* the partition is ignored by subscribe() */ + RD_KAFKA_PARTITION_UA); + + // /*second place can seek a partion. query water mark and seek*/ + // int64_t low, high; + // err = rd_kafka_query_watermark_offsets(rk,topics[i], 0, &low, &high, 5000); + // if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + // fprintf(stderr, "Failed to query watermark offsets: %s\n", rd_kafka_err2str(err)); + // } else { + // printf("Partition 0 offset range: [%ld, %ld]\n", low, high); + // } + // auto rkv = rd_kafka_topic_partition_list_add(subscription, topics[i], + // /* the partition must be specfied when you want to seek */ + // 0); + // rkv->offset = (low + high)/2; + } + /** Subscribe to the list of topics + * If you want to initially seek a partion, using rd_kafka_assign + * however, rd_kafka_assign can not recieve rebalanced new topics, eg, mathced by regexp. + */ + err = rd_kafka_subscribe(rk, subscription); + //err = rd_kafka_assign(rk, subscription); + if (err) { + fprintf(stderr, "%% Failed to subscribe/assign to %d topics: %s\n", + subscription->cnt, rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(subscription); + rd_kafka_destroy(rk); + return 1; + } - fprintf(stderr, - "%% Subscribed to %d topic(s), " - "waiting for rebalance and messages...\n", - subscription->cnt); + fprintf(stderr, + "%% Subscribed to %d topic(s), " + "waiting for rebalance and messages...\n", + subscription->cnt); - rd_kafka_topic_partition_list_destroy(subscription); + rd_kafka_topic_partition_list_destroy(subscription); + subscription = 0; - /* Signal handler for clean shutdown */ - signal(SIGINT, stop); + /* Signal handler for clean shutdown */ + signal(SIGINT, stop); - /* Subscribing to topics will trigger a group rebalance + /* Subscribing to topics will trigger a group rebalance * which may take some time to finish, but there is no need * for the application to handle this idle period in a special way * since a rebalance may happen at any time. * Start polling for messages. */ - while (run) { - rd_kafka_message_t *rkm; + while (run) { + rd_kafka_message_t *rkm; - rkm = rd_kafka_consumer_poll(rk, 100); - if (!rkm) - continue; /* Timeout: no message within 100ms, + rkm = rd_kafka_consumer_poll(rk, 100); + if (!rkm) + continue; /* Timeout: no message within 100ms, * try again. This short timeout allows * checking for `run` at frequent intervals. */ + //third place can seek a partion. + //rd_kafka_seek(rkm->rkt,rkm->partition,0,100); - /* consumer_poll() will return either a proper message + /* consumer_poll() will return either a proper message * or a consumer error (rkm->err is set). */ - if (rkm->err) { - /* Consumer errors are generally to be considered + if (rkm->err) { + /* Consumer errors are generally to be considered * informational as the consumer will automatically * try to recover from all types of errors. */ - fprintf(stderr, "%% Consumer error: %s\n", - rd_kafka_message_errstr(rkm)); - rd_kafka_message_destroy(rkm); - continue; - } - - /* Proper message. */ - printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n", - rd_kafka_topic_name(rkm->rkt), rkm->partition, - rkm->offset); - - /* Print the message key. */ - if (rkm->key && is_printable((const char *)rkm->key, rkm->key_len)) - printf(" Key: %.*s\n", (int)rkm->key_len, - (const char *)rkm->key); - else if (rkm->key) - printf(" Key: (%d bytes)\n", (int)rkm->key_len); - - /* Print the message value/payload. */ - if (rkm->payload && is_printable((const char *)rkm->payload, rkm->len)) - printf(" Value: %.*s\n", (int)rkm->len, - (const char *)rkm->payload); - else if (rkm->payload) - printf(" Value: (%d bytes)\n", (int)rkm->len); - - rd_kafka_message_destroy(rkm); + fprintf(stderr, "%% Consumer error: %s\n", + rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + continue; } + /* Proper message. */ + printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n", + rd_kafka_topic_name(rkm->rkt), rkm->partition, + rkm->offset); - /* Close the consumer: commit final offsets and leave the group. */ - fprintf(stderr, "%% Closing consumer\n"); - rd_kafka_consumer_close(rk); + /* Print the message key. */ + if (rkm->key && is_printable((const char *)rkm->key, rkm->key_len)) + printf(" Key: %.*s\n", (int)rkm->key_len, + (const char *)rkm->key); + else if (rkm->key) + printf(" Key: (%d bytes)\n", (int)rkm->key_len); + /* Print the message value/payload. */ + if (rkm->payload && is_printable((const char *)rkm->payload, rkm->len)) + printf(" Value: %.*s\n", (int)rkm->len, + (const char *)rkm->payload); + else if (rkm->payload) + printf(" Value: (%d bytes)\n", (int)rkm->len); - /* Destroy the consumer */ - rd_kafka_destroy(rk); + rd_kafka_message_destroy(rkm); + } + + + /* Close the consumer: commit final offsets and leave the group. */ + fprintf(stderr, "%% Closing consumer\n"); + rd_kafka_consumer_close(rk); + + + /* Destroy the consumer */ + rd_kafka_destroy(rk); - return 0; + return 0; } diff --git a/kafka/rdkafka_test/main.cpp b/kafka/rdkafka_test/main.cpp index 244ca35323e00cc8c2c81e58466569390a9c01c6..e23623eb27a026b1eac49c7dfbe2bc51c4bb32cf 100644 --- a/kafka/rdkafka_test/main.cpp +++ b/kafka/rdkafka_test/main.cpp @@ -24,6 +24,13 @@ int main(int argc, char ** argv) else { printf ("You must raname your exe file to consumer or producer.\n"); + printf ("please choose: 0=exit,1=consumer,2=producer:\n"); + char c = getchar(); + if (c=='1') + main_consumer(argc,argv); + else if (c=='2') + main_producer(argc,argv); + } return 0; } diff --git a/kafka/rdkafka_test/producer.cpp b/kafka/rdkafka_test/producer.cpp index bf4c42e5caf07dcf589561d3015ef5114adb5dc1..943cf73d8456e8f0210956811efb00727ce8443a 100644 --- a/kafka/rdkafka_test/producer.cpp +++ b/kafka/rdkafka_test/producer.cpp @@ -78,6 +78,19 @@ int main_producer(int argc, char **argv) { return 1; } + if (rd_kafka_conf_set(conf, "compression.type", "zstd", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } + if (rd_kafka_conf_set(conf, "compression.level", "9", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return false; + } + /* Set the delivery report callback. * This callback will be called once per message to inform * the application if delivery succeeded or failed.