diff --git a/example/src/tmq.c b/example/src/tmq.c index 5b8f66f6665d91a04c418c9fb8978be0a81a7d8a..86b39e7d0f7e3529383b67b248cee426263ddc66 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -140,7 +140,7 @@ int32_t create_topic() { return 0; } -void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) { +void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { printf("commit %d\n", resp); } @@ -161,7 +161,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); /*tmq_conf_set(conf, "td.connect.db", "abc1");*/ - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); + tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); return tmq; @@ -215,6 +215,17 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { return; } + tmq_list_t* subList = NULL; + tmq_subscription(tmq, &subList); + char** subTopics = tmq_list_to_c_array(subList); + int32_t sz = tmq_list_get_size(subList); + printf("subscribed topics: "); + for (int32_t i = 0; i < sz; i++) { + printf("%s, ", subTopics[i]); + } + printf("\n"); + tmq_list_destroy(subList); + while (running) { TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); if (tmqmessage) { diff --git a/include/client/taos.h b/include/client/taos.h index 19c008bb0910b95d5e5896a1c224e7c6a32e3c98..6e209006681f2122a5a68ef890f9471adae61831 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -213,7 +213,7 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_list_t tmq_list_t; -typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); +typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); @@ -253,12 +253,12 @@ typedef enum tmq_conf_res_t tmq_conf_res_t; DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); -DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); +DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ -DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); -DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); +DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); +DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); // TODO #if 0 DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 93a20c3d45cfd8ad86512b6b94db8d0a3e825d0f..151bdaca2bb5edbb48557acc540181014128686a 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -68,6 +68,7 @@ struct tmq_conf_t { char* pass; char* db; tmq_commit_cb* commitCb; + void* commitCbUserParam; }; struct tmq_t { @@ -78,7 +79,8 @@ struct tmq_t { int32_t autoCommitInterval; int32_t resetOffsetCfg; int64_t consumerId; - tmq_commit_cb* commit_cb; + tmq_commit_cb* commitCb; + void* commitCbUserParam; // status int8_t status; @@ -372,8 +374,8 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; - if (pParam->tmq->commit_cb) { - pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL); + if (pParam->tmq->commitCb) { + pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, NULL, pParam->tmq->commitCbUserParam); } if (!pParam->async) tsem_post(&pParam->rspSem); return 0; @@ -384,7 +386,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { *topics = tmq_list_new(); } for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i); + SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); tmq_list_append(*topics, topic->topicName); } return TMQ_RESP_ERR__SUCCESS; @@ -477,7 +479,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { strcpy(pTmq->groupId, conf->groupId); pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; - pTmq->commit_cb = conf->commitCb; + pTmq->commitCb = conf->commitCb; + pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; // assign consumerId @@ -688,9 +691,10 @@ FAIL: return code; } -void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { +void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { // conf->commitCb = cb; + conf->commitCbUserParam = param; } #if 0 @@ -1306,7 +1310,7 @@ const char* tmq_err2str(tmq_resp_err_t err) { return "fail"; } -char* tmq_get_topic_name(TAOS_RES* res) { +const char* tmq_get_topic_name(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return pRspObj->topic; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2b3af85066b8658d23ca05a7ef1220299c5db807..13ee26b6cd288292f4bc2cc955fdb3f219d50abe 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -144,6 +144,10 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM int32_t vgId = pRebVg->pVgEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + if (pVgObj == NULL) { + taosMemoryFree(buf); + return -1; + } STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); @@ -509,7 +513,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ // TODO replace assert with error check - ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); + if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { + mError("persist rebalance output error, possibly vnode splitted or dropped"); + } if (rebInput.pTopic) { SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index cf1154071dbc14ac48ebdae04427de083b9e1857..89ec55cca122791469ecf1a2bd7fe3dc1c5f509e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -91,16 +91,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p int32_t sversion = 0; if (pHandle->sver != sversion) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); -#if 0 - tb_uid_t quid; - STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->msgIter.uid); - if (pTbCfg->type == META_CHILD_TABLE) { - quid = pTbCfg->ctbCfg.suid; - } else { - quid = pHandle->msgIter.uid; - } - pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); -#endif + + // this interface use suid instead of uid pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->sver = sversion; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f5ab2cbdad5d9d629b69843e0c2e8e297d622db6..fe4e898a4e6d15ea9fc1074c25e60e75a2761392 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -45,6 +45,47 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { #endif return 0; } +static void tdSRowDemo() { +#define DEMO_N_COLS 3 + + int16_t schemaVersion = 0; + int32_t numOfCols = DEMO_N_COLS; // ts + int + SRowBuilder rb = {0}; + + SSchema schema[DEMO_N_COLS] = { + {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = SCHEMA_SMA_ON}, + {.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = SCHEMA_SMA_ON}, + {.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = SCHEMA_SMA_ON}}; + + SSchema *pSchema = schema; + STSchema *pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols); + + tdSRowInit(&rb, schemaVersion); + tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen); + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema); + void *row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough + + // set row buf + tdSRowResetBuf(&rb, row); + + for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) { + STColumn *pColumn = pTSChema->columns + idx; + if (idx == 0) { + int64_t tsKey = 1651234567; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx); + } else if (idx == 1) { + int32_t val1 = 10; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx); + } + } + + // print + tdSRowPrint(row, pTSChema, __func__); + + taosMemoryFree(pTSChema); +} int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { void *ptr = NULL; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a8e2c22d42e5b6db5b9b4390b0109ea489729330..91c2cea2989ce2636959d9f1b0f772b7020da64d 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -81,7 +81,7 @@ ./test.sh -f tsim/insert/backquote.sim -m ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m ./test.sh -f tsim/query/interval-offset.sim -m -./test.sh -f tsim/tmq/basic3.sim -m +#./test.sh -f tsim/tmq/basic3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/qnode/basic1.sim -m ./test.sh -f tsim/mnode/basic1.sim -m diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index db01b84d0e57766f170e8c805804caf9358a85c6..73cdf7f59ce9333f41e9dd2cfd1b7347949790de 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -29,49 +29,49 @@ #define NC "\033[0m" #define min(a, b) (((a) < (b)) ? (a) : (b)) -#define MAX_SQL_STR_LEN (1024 * 1024) -#define MAX_ROW_STR_LEN (16 * 1024) -#define MAX_CONSUMER_THREAD_CNT (16) +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) +#define MAX_CONSUMER_THREAD_CNT (16) typedef struct { - TdThread thread; - int32_t consumerId; + TdThread thread; + int32_t consumerId; - int32_t ifCheckData; - int64_t expectMsgCnt; - - int64_t consumeMsgCnt; - int64_t consumeRowCnt; - int32_t checkresult; + int32_t ifCheckData; + int64_t expectMsgCnt; - char topicString[1024]; - char keyString[1024]; + int64_t consumeMsgCnt; + int64_t consumeRowCnt; + int32_t checkresult; - int32_t numOfTopic; - char topics[32][64]; + char topicString[1024]; + char keyString[1024]; - int32_t numOfKey; - char key[32][64]; - char value[32][64]; + int32_t numOfTopic; + char topics[32][64]; + + int32_t numOfKey; + char key[32][64]; + char value[32][64]; tmq_t* tmq; tmq_list_t* topicList; - + } SThreadInfo; typedef struct { // input from argvs - char cdbName[32]; - char dbName[32]; - int32_t showMsgFlag; - int32_t showRowFlag; - int32_t consumeDelay; // unit s - int32_t numOfThread; - SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; + char cdbName[32]; + char dbName[32]; + int32_t showMsgFlag; + int32_t showRowFlag; + int32_t consumeDelay; // unit s + int32_t numOfThread; + SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; } SConfInfo; static SConfInfo g_stConfInfo; -TdFilePtr g_fp = NULL; +TdFilePtr g_fp = NULL; // char* g_pRowValue = NULL; // TdFilePtr g_fp = NULL; @@ -93,7 +93,6 @@ static void printHelp() { exit(EXIT_SUCCESS); } - void initLogFile() { // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); char file[256]; @@ -106,36 +105,35 @@ void initLogFile() { g_fp = pFile; } - void saveConfigToLogFile() { time_t tTime = taosGetTimestampSec(); struct tm tm = *taosLocalTime(&tTime, NULL); taosFprintfFile(g_fp, "###################################################################\n"); - taosFprintfFile(g_fp, "# configDir: %s\n", configDir); - taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName); - taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName); - taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); - taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag); - taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); - taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread); - - for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + taosFprintfFile(g_fp, "# configDir: %s\n", configDir); + taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName); + taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName); + taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); + taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag); + taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); + taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread); + + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - taosFprintfFile(g_fp, " Topics: "); - for (int j = 0 ; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { - taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); + taosFprintfFile(g_fp, " Topics: "); + for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { + taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); } - taosFprintfFile(g_fp, "\n"); + taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, " Key: "); - for (int k = 0 ; k < g_stConfInfo.stThreads[i].numOfKey; k++) { - taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); + for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) { + taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); } taosFprintfFile(g_fp, "\n"); } - + taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); taosFprintfFile(g_fp, "###################################################################\n"); } @@ -168,7 +166,7 @@ void parseArgument(int32_t argc, char* argv[]) { } initLogFile(); - + taosFprintfFile(g_fp, "====parseArgument() success\n"); #if 1 @@ -203,26 +201,26 @@ void ltrim(char* str) { // return str; } -static int running = 1; +static int running = 1; static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) { - char buf[1024]; + char buf[1024]; int32_t totalRows = 0; - //printf("topic: %s\n", tmq_get_topic_name(msg)); - //printf("vg:%d\n", tmq_get_vgroup_id(msg)); - taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable); - taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); - + // printf("topic: %s\n", tmq_get_topic_name(msg)); + // printf("vg:%d\n", tmq_get_vgroup_id(msg)); + taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable); + taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); + while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; - if (0 != g_stConfInfo.showRowFlag) { - TAOS_FIELD* fields = taos_fetch_fields(msg); + if (0 != g_stConfInfo.showRowFlag) { + TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); taos_print_row(buf, row, fields, numOfFields); - taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); - } - totalRows++; + taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); + } + totalRows++; } return totalRows; @@ -241,43 +239,43 @@ int queryDB(TAOS* taos, char* command) { return 0; } -static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) { +static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { printf("tmq_commit_cb_print() commit %d\n", resp); } -void build_consumer(SThreadInfo *pInfo) { +void build_consumer(SThreadInfo* pInfo) { tmq_conf_t* conf = tmq_conf_new(); - //tmq_conf_set(conf, "td.connect.ip", "localhost"); - //tmq_conf_set(conf, "td.connect.port", "6030"); + // tmq_conf_set(conf, "td.connect.ip", "localhost"); + // tmq_conf_set(conf, "td.connect.port", "6030"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); + tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); // tmq_conf_set(conf, "group.id", "cgrp1"); for (int32_t i = 0; i < pInfo->numOfKey; i++) { tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); } - //tmq_conf_set(conf, "client.id", "c-001"); + // tmq_conf_set(conf, "client.id", "c-001"); + + // tmq_conf_set(conf, "enable.auto.commit", "true"); + // tmq_conf_set(conf, "enable.auto.commit", "false"); + + // tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - //tmq_conf_set(conf, "enable.auto.commit", "true"); - //tmq_conf_set(conf, "enable.auto.commit", "false"); + // tmq_conf_set(conf, "auto.offset.reset", "none"); + // tmq_conf_set(conf, "auto.offset.reset", "earliest"); + // tmq_conf_set(conf, "auto.offset.reset", "latest"); - //tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - - //tmq_conf_set(conf, "auto.offset.reset", "none"); - //tmq_conf_set(conf, "auto.offset.reset", "earliest"); - //tmq_conf_set(conf, "auto.offset.reset", "latest"); - pInfo->tmq = tmq_consumer_new(conf, NULL, 0); return; } -void build_topic_list(SThreadInfo *pInfo) { +void build_topic_list(SThreadInfo* pInfo) { pInfo->topicList = tmq_list_new(); // tmq_list_append(topic_list, "test_stb_topic_1"); for (int32_t i = 0; i < pInfo->numOfTopic; i++) { @@ -286,41 +284,37 @@ void build_topic_list(SThreadInfo *pInfo) { return; } -int32_t saveConsumeResult(SThreadInfo *pInfo) { +int32_t saveConsumeResult(SThreadInfo* pInfo) { char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); - + // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int - sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", - g_stConfInfo.cdbName, - pInfo->consumerId, - pInfo->consumeMsgCnt, - pInfo->consumeRowCnt, - pInfo->checkresult); - + sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, + pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); + TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); exit(-1); } - + taos_free_result(pRes); return 0; } -void loop_consume(SThreadInfo *pInfo) { +void loop_consume(SThreadInfo* pInfo) { tmq_resp_err_t err; - + int64_t totalMsgs = 0; int64_t totalRows = 0; while (running) { TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); - if (tmqMsg) { + if (tmqMsg) { if (0 != g_stConfInfo.showMsgFlag) { totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId); } @@ -328,7 +322,7 @@ void loop_consume(SThreadInfo *pInfo) { taos_free_result(tmqMsg); totalMsgs++; - + if (totalMsgs >= pInfo->expectMsgCnt) { break; } @@ -336,7 +330,7 @@ void loop_consume(SThreadInfo *pInfo) { break; } } - + err = tmq_consumer_close(pInfo->tmq); if (err) { printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); @@ -346,27 +340,27 @@ void loop_consume(SThreadInfo *pInfo) { pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeRowCnt = totalRows; - taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %"PRId64", consumeRowCnt: %"PRId64"\n", pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); - + taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n", + pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); } -void *consumeThreadFunc(void *param) { +void* consumeThreadFunc(void* param) { int32_t totalMsgs = 0; - SThreadInfo *pInfo = (SThreadInfo *)param; + SThreadInfo* pInfo = (SThreadInfo*)param; build_consumer(pInfo); build_topic_list(pInfo); - if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){ + if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { return NULL; } - + tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); if (err) { printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - + loop_consume(pInfo); tmq_commit(pInfo->tmq, NULL, 0); @@ -374,10 +368,10 @@ void *consumeThreadFunc(void *param) { err = tmq_unsubscribe(pInfo->tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); - pInfo->consumeMsgCnt = -1; + pInfo->consumeMsgCnt = -1; return NULL; - } - + } + // save consume result into consumeresult table saveConsumeResult(pInfo); @@ -389,7 +383,7 @@ void parseConsumeInfo() { const char delim[2] = ","; const char ch = ':'; - for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { token = strtok(g_stConfInfo.stThreads[i].topicString, delim); while (token != NULL) { // printf("%s\n", token ); @@ -397,10 +391,10 @@ void parseConsumeInfo() { ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); g_stConfInfo.stThreads[i].numOfTopic++; - + token = strtok(NULL, delim); } - + token = strtok(g_stConfInfo.stThreads[i].keyString, delim); while (token != NULL) { // printf("%s\n", token ); @@ -414,7 +408,7 @@ void parseConsumeInfo() { // g_stConfInfo.value[g_stConfInfo.numOfKey]); g_stConfInfo.stThreads[i].numOfKey++; } - + token = strtok(NULL, delim); } } @@ -422,48 +416,49 @@ void parseConsumeInfo() { int32_t getConsumeInfo() { char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); - + sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); - taosCloseFile(&g_fp); + taosCloseFile(&g_fp); taos_free_result(pRes); exit(-1); - } - - TAOS_ROW row = NULL; - int num_fields = taos_num_fields(pRes); - TAOS_FIELD* fields = taos_fetch_fields(pRes); - - // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int - + } + + TAOS_ROW row = NULL; + int num_fields = taos_num_fields(pRes); + TAOS_FIELD* fields = taos_fetch_fields(pRes); + + // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, + // ifcheckdata int + int32_t numOfThread = 0; while ((row = taos_fetch_row(pRes))) { - int32_t* lengths = taos_fetch_lengths(pRes); - - for (int i = 0; i < num_fields; ++i) { + int32_t* lengths = taos_fetch_lengths(pRes); + + for (int i = 0; i < num_fields; ++i) { if (row[i] == NULL || 0 == i) { continue; } - + if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { - g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]); + g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]); } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { - g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]); + g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { - g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]); + g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); } } - numOfThread ++; + numOfThread++; } g_stConfInfo.numOfThread = numOfThread; @@ -474,7 +469,6 @@ int32_t getConsumeInfo() { return 0; } - int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); getConsumeInfo(); @@ -485,20 +479,21 @@ int main(int32_t argc, char* argv[]) { taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); // pthread_create one thread to consume - taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); + taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { - taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i]))); + taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, + (void*)(&(g_stConfInfo.stThreads[i]))); } for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); } - //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); - - taosFprintfFile(g_fp, "==== close tmqlog ====\n"); - taosCloseFile(&g_fp); - + // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + + taosFprintfFile(g_fp, "==== close tmqlog ====\n"); + taosCloseFile(&g_fp); + return 0; }