diff --git a/example/src/tmq.c b/example/src/tmq.c index 56f210081b43fb4d3f1185129b5a94c27c8f03a0..0d5e49c592405dfa1121b1ee81191b0a44529d2a 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -48,7 +48,6 @@ int32_t init_env() { return -1; } taos_free_result(pRes); - sleep(1); pRes = taos_query(pConn, "use abc1"); if (taos_errno(pRes) != 0) { diff --git a/include/client/taos.h b/include/client/taos.h index 72cb7bfa96fa939bc5cb41580eaab9e642d851be..5b200ed0af91b26f0785aa2b83812f125a188b71 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -27,10 +27,7 @@ typedef void TAOS; typedef void TAOS_STMT; typedef void TAOS_RES; typedef void **TAOS_ROW; -#if 0 -typedef void TAOS_STREAM; -#endif -typedef void TAOS_SUB; +typedef void TAOS_SUB; // Data type definition #define TSDB_DATA_TYPE_NULL 0 // 1 bytes @@ -192,12 +189,6 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); #endif -#if 0 -DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - int64_t stime, void *param, void (*callback)(void *)); -DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); -#endif - DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); @@ -237,12 +228,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); -DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); +DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); -#if 0 -DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); -DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); -#endif DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); @@ -269,7 +256,7 @@ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); // TODO #if 0 -DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res); +DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); #endif #if 0 diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f9540bc8fc9fdffdc13b40f98c9a27cda2ed845c..a9acb781bea03b2589125c21e07fe0427d081bcf 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -56,7 +56,7 @@ struct tmq_conf_t { int8_t autoCommit; int8_t resetOffset; uint16_t port; - uint16_t autoCommitInterval; + int32_t autoCommitInterval; char* ip; char* user; char* pass; @@ -76,8 +76,9 @@ struct tmq_t { char groupId[TSDB_CGROUP_LEN]; char clientId[256]; int8_t autoCommit; - int64_t consumerId; + int32_t autoCommitInterval; int32_t resetOffsetCfg; + int64_t consumerId; tmq_commit_cb* commit_cb; // status @@ -87,6 +88,11 @@ struct tmq_t { int32_t epSkipCnt; int64_t pollCnt; + // timer + tmr_h hbTimer; + tmr_h reportTimer; + tmr_h commitTimer; + // connection STscObj* pTscObj; @@ -111,6 +117,12 @@ enum { TMQ_CONSUMER_STATUS__READY, }; +enum { + TMQ_DELAYED_TASK__HB = 1, + TMQ_DELAYED_TASK__REPORT, + TMQ_DELAYED_TASK__COMMIT, +}; + typedef struct { // statistics int64_t pollCnt; @@ -280,6 +292,50 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } +void tmqAssignDelayedHbTask(void* param, void* tmrId) { + tmq_t* tmq = (tmq_t*)param; + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + *pTaskType = TMQ_DELAYED_TASK__HB; + taosWriteQitem(tmq->delayedTask, pTaskType); +} + +void tmqAssignDelayedCommitTask(void* param, void* tmrId) { + tmq_t* tmq = (tmq_t*)param; + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + *pTaskType = TMQ_DELAYED_TASK__COMMIT; + taosWriteQitem(tmq->delayedTask, pTaskType); +} + +void tmqAssignDelayedReportTask(void* param, void* tmrId) { + tmq_t* tmq = (tmq_t*)param; + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + *pTaskType = TMQ_DELAYED_TASK__REPORT; + taosWriteQitem(tmq->delayedTask, pTaskType); +} + +int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { + STaosQall* qall = taosAllocateQall(); + taosReadAllQitems(tmq->delayedTask, qall); + while (1) { + int8_t* pTaskType = NULL; + taosGetQitem(qall, (void**)&pTaskType); + if (pTaskType == NULL) break; + + if (*pTaskType == TMQ_DELAYED_TASK__HB) { + tmqAskEp(tmq, false); + taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer); + } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { + tmq_commit(tmq, NULL, true); + taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); + } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { + } else { + ASSERT(0); + } + } + taosFreeQall(qall); + return 0; +} + void tmqClearUnhandleMsg(tmq_t* tmq) { SMqRspWrapper* msg = NULL; while (1) { @@ -414,7 +470,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); - pTmq->autoCommit = conf->autoCommit; + /*pTmq->autoCommit = conf->autoCommit;*/ + pTmq->autoCommit = 0; + pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; @@ -607,6 +665,14 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { taosMsleep(500); } + // init hb timer + tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + + // init auto commit timer + if (tmq->autoCommit) { + tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer); + } + code = 0; FAIL: if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree); @@ -1216,7 +1282,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } while (1) { - tmqAskEp(tmq, false); + tmqHandleAllDelayedTask(tmq); tmqPollImpl(tmq, blocking_time); /*tsem_wait(&tmq->rspSem);*/ diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 207b7d84219750cf1d2110d30b4b6b6f9d69e842..d025ca3781a9db5262a09498cdd65a6652d019fc 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -61,7 +61,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) { walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ); - // TODO:change to deserialize function if (pIdxTFile == NULL) { taosThreadMutexUnlock(&pWal->mutex); return -1; @@ -73,7 +72,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return -1; } // read idx file and get log file pos - // TODO:change to deserialize function SWalIdxEntry entry; if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { taosThreadMutexUnlock(&pWal->mutex); @@ -167,7 +165,7 @@ int32_t walEndSnapshot(SWal *pWal) { char fnameStr[WAL_FILE_LEN]; // remove file for (int i = 0; i < deleteCnt; i++) { - SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i); + pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); taosRemoveFile(fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr); diff --git a/source/os/src/osSystem.c b/source/os/src/osSystem.c index 148529170c7d50a633993efb4bf4a1496bc29683..62c1747619154257c68bac98bf1e26b1b38d25c5 100644 --- a/source/os/src/osSystem.c +++ b/source/os/src/osSystem.c @@ -39,11 +39,11 @@ void* taosLoadDll(const char* filename) { #else void* handle = dlopen(filename, RTLD_LAZY); if (!handle) { - //printf("load dll:%s failed, error:%s", filename, dlerror()); + // printf("load dll:%s failed, error:%s", filename, dlerror()); return NULL; } - //printf("dll %s loaded", filename); + // printf("dll %s loaded", filename); return handle; #endif @@ -59,17 +59,17 @@ void* taosLoadSym(void* handle, char* name) { char* error = NULL; if ((error = dlerror()) != NULL) { - //printf("load sym:%s failed, error:%s", name, dlerror()); + // printf("load sym:%s failed, error:%s", name, dlerror()); return NULL; } - //printf("sym %s loaded", name); + // printf("sym %s loaded", name); return sym; #endif } -void taosCloseDll(void* handle) { +void taosCloseDll(void* handle) { #if defined(WINDOWS) return; #elif defined(_TD_DARWIN_64) @@ -100,7 +100,7 @@ int taosSetConsoleEcho(bool on) { struct termios term; if (tcgetattr(STDIN_FILENO, &term) == -1) { - perror("Cannot get the attribution of the terminal"); + /*perror("Cannot get the attribution of the terminal");*/ return -1; } @@ -111,7 +111,7 @@ int taosSetConsoleEcho(bool on) { err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term); if (err == -1 || err == EINTR) { - printf("Cannot set the attribution of the terminal"); + /*printf("Cannot set the attribution of the terminal");*/ return -1; } @@ -154,7 +154,7 @@ void taosSetTerminalMode() { int32_t taosGetOldTerminalMode() { #if defined(WINDOWS) - + #else /* Make sure stdin is a terminal. */ if (!isatty(STDIN_FILENO)) { @@ -181,7 +181,7 @@ void taosResetTerminalMode() { #endif } -TdCmdPtr taosOpenCmd(const char *cmd) { +TdCmdPtr taosOpenCmd(const char* cmd) { if (cmd == NULL) return NULL; #ifdef WINDOWS return (TdCmdPtr)_popen(cmd, "r"); @@ -190,8 +190,8 @@ TdCmdPtr taosOpenCmd(const char *cmd) { #endif } -int64_t taosGetLineCmd(TdCmdPtr pCmd, char ** __restrict ptrBuf) { - if (pCmd == NULL || ptrBuf == NULL ) { +int64_t taosGetLineCmd(TdCmdPtr pCmd, char** __restrict ptrBuf) { + if (pCmd == NULL || ptrBuf == NULL) { return -1; } if (*ptrBuf != NULL) { @@ -219,7 +219,7 @@ int32_t taosEOFCmd(TdCmdPtr pCmd) { return feof((FILE*)pCmd); } -int64_t taosCloseCmd(TdCmdPtr *ppCmd) { +int64_t taosCloseCmd(TdCmdPtr* ppCmd) { if (ppCmd == NULL || *ppCmd == NULL) { return 0; } diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index d7534338e1c0781cb911c2bcd4b3f19c730ff59e..0c96635a788ba18ab391cfad869e2a68e675954c 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0 diff --git a/tests/script/tsim/tmq/basic2.sim b/tests/script/tsim/tmq/basic2.sim index ac0d2bb6df1a228ed8576103f39a4ad70ff6c850..53f10e22475cc067a25b86b7aa0c46c342d0111c 100644 --- a/tests/script/tsim/tmq/basic2.sim +++ b/tests/script/tsim/tmq/basic2.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0 diff --git a/tests/script/tsim/tmq/basic3.sim b/tests/script/tsim/tmq/basic3.sim index c0ba2c97fb3528d72b67bd51b183c13f4d92e351..de771ba892ec01825fbed08464a7d3794a5226bf 100644 --- a/tests/script/tsim/tmq/basic3.sim +++ b/tests/script/tsim/tmq/basic3.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0 diff --git a/tests/script/tsim/tmq/basic4.sim b/tests/script/tsim/tmq/basic4.sim index 1eed93463cc37322fe523cc1a91984b7c2c32da3..42023bda7e1f5d5defd0266ae5a4146f50851314 100644 --- a/tests/script/tsim/tmq/basic4.sim +++ b/tests/script/tsim/tmq/basic4.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0