diff --git a/CMakeLists.txt b/CMakeLists.txt index 0383cc8aed9292ce18911662319d421ae2e3b3c3..fd542966cc73a450e48844ae7aa4c68225974f04 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ endif(${BUILD_TEST}) add_subdirectory(source) add_subdirectory(tools) add_subdirectory(tests) +add_subdirectory(example) # docs add_subdirectory(docs) diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..71a9f9f4c59e288dc304bd7887f4ef454fa41284 --- /dev/null +++ b/example/CMakeLists.txt @@ -0,0 +1,16 @@ +aux_source_directory(src TMQ_DEMO_SRC) + +add_executable(tmq ${TMQ_DEMO_SRC}) +target_link_libraries( + tmq + PUBLIC taos + #PUBLIC util + #PUBLIC common + #PUBLIC os +) +target_include_directories( + tmq + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) diff --git a/example/src/tmq.c b/example/src/tmq.c new file mode 100644 index 0000000000000000000000000000000000000000..c1c6677b0287e39055506dc8edb1afdceed779ac --- /dev/null +++ b/example/src/tmq.c @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include "taos.h" + +static int running = 1; +static void msg_process(tmq_message_t* message) { + tmqShowMsg(message); +} + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tu using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tu2 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + + char* sql = "select * from st1"; + pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); + /*if (taos_errno(pRes) != 0) {*/ + /*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/ + /*return -1;*/ + /*}*/ + /*taos_free_result(pRes);*/ + taos_close(pConn); + return 0; +} + +tmq_t* build_consumer() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + return tmq; + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_stb_topic_1"); + tmq_subscribe(tmq, topic_list); + return NULL; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_stb_topic_1"); + return topic_list; +} + +void basic_consume_loop(tmq_t *tmq, + tmq_list_t *topics) { + tmq_resp_err_t err; + + if ((err = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + printf("subscribe err\n"); + return; + } + + while (running) { + tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); + if (tmq) { + msg_process(tmqmessage); + tmq_message_destroy(tmqmessage); + } + } + + err = tmq_consumer_close(tmq); + if (err) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +void sync_consume_loop(tmq_t *rk, + tmq_list_t *topics) { + static const int MIN_COMMIT_COUNT = 1000; + + int msg_count = 0; + tmq_resp_err_t err; + + if ((err = tmq_subscribe(rk, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + return; + } + + while (running) { + tmq_message_t *tmqmessage = tmq_consumer_poll(rk, 500); + if (tmqmessage) { + msg_process(tmqmessage); + tmq_message_destroy(tmqmessage); + + if ((++msg_count % MIN_COMMIT_COUNT) == 0) + tmq_commit(rk, NULL, 0); + } + } + + err = tmq_consumer_close(rk); + if (err) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main() { + int code; + code = init_env(); + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + /*basic_consume_loop(tmq, topic_list);*/ + sync_consume_loop(tmq, topic_list); +} diff --git a/include/client/taos.h b/include/client/taos.h index ac2893651d82663a4b58d02f0fb9e37f1db80014..7c5cc56b209c813847d635c783541f8aa4ce0b31 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -218,6 +218,7 @@ DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message); +DLL_EXPORT const char* tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); @@ -226,8 +227,8 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); #endif DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); -#if 0 DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq); +#if 0 DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d5296620086ae1df57abcbae55d1212d6e504c54..facc8c721683a3cbda062176a5f6861ab8ad8ead 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -348,7 +348,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i goto _return; } - printf("%s\n", pStr); + /*printf("%s\n", pStr);*/ // The topic should be related to a database that the queried table is belonged to. SName name = {0}; @@ -501,7 +501,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; SMqClientVg* pVg = pParam->pVg; if (code != 0) { - printf("msg discard\n"); + /*printf("msg discard\n");*/ tsem_post(&pParam->rspSem); return 0; } @@ -512,7 +512,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { return -1; } tDecodeSMqConsumeRsp(pMsg->pData, pRsp); - printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset); + /*printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset);*/ if (pRsp->numOfTopics == 0) { /*printf("no data\n");*/ free(pRsp); @@ -766,6 +766,16 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { free(tmq_message); } +tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { + return TMQ_RESP_ERR__SUCCESS; +} + +const char* tmq_err2str(tmq_resp_err_t err) { + if (err == TMQ_RESP_ERR__SUCCESS) { + return "success"; + } + return "fail"; +} #if 0 tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* pTmq = malloc(sizeof(tmq_t)); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b7ee64e19fae0a8cdd048f2964392b2e4e30b192..553dacafddc41a08019b57401a43ef5f5df1caa6 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -638,6 +638,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) { //if (msg == NULL) break; } } +#endif TEST(testCase, tmq_subscribe_stb_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -676,7 +677,6 @@ TEST(testCase, tmq_consume_Test) { TEST(testCase, tmq_commit_TEST) { } -#endif #if 0 TEST(testCase, projection_query_tables) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ab9a1cb5eab31b38f51d02d6fb6e878c4a3120b9..75f90df65813cb5f66c48130a16848a4929a84a4 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -583,19 +583,23 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { pSub->availConsumer = NULL; } if (pSub->assigned) { - taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); + //taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); + taosArrayDestroy(pSub->assigned); pSub->assigned = NULL; } if (pSub->unassignedVg) { - taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + //taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + taosArrayDestroy(pSub->unassignedVg); pSub->unassignedVg = NULL; } if (pSub->idleConsumer) { - taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + //taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + taosArrayDestroy(pSub->idleConsumer); pSub->idleConsumer = NULL; } if (pSub->lostConsumer) { - taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + //taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + taosArrayDestroy(pSub->lostConsumer); pSub->lostConsumer = NULL; } }