diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5cf9e9855def12dc355e809e796495f9ce063a16..f332300a4f5d951826a7eb4334c06a4c45621d7c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2570,6 +2570,12 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withTag); + if (pRsp->withTbName) { + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); + } + if (pRsp->withSchema) { + pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); + } for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = 0; @@ -2579,20 +2585,14 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p taosArrayPush(pRsp->blockDataLen, &bLen); taosArrayPush(pRsp->blockData, &data); if (pRsp->withSchema) { - pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); buf = taosDecodeSSchemaWrapper(buf, pSW); taosArrayPush(pRsp->blockSchema, &pSW); - } else { - pRsp->blockSchema = NULL; } if (pRsp->withTbName) { - pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); char* name = NULL; buf = taosDecodeString(buf, &name); taosArrayPush(pRsp->blockTbName, &name); - } else { - pRsp->blockTbName = NULL; } } } diff --git a/include/util/ttimer.h b/include/util/ttimer.h index 10222596319f445c980e5a03b9ded91a3ca9ce4e..4111a8ca28375cbcf45f60512da06802eeb22669 100644 --- a/include/util/ttimer.h +++ b/include/util/ttimer.h @@ -31,16 +31,16 @@ extern int32_t taosTmrThreads; void *taosTmrInit(int32_t maxTmr, int32_t resoultion, int32_t longest, const char *label); +void taosTmrCleanUp(void *handle); + tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle); bool taosTmrStop(tmr_h tmrId); -bool taosTmrStopA(tmr_h *timerId); +bool taosTmrStopA(tmr_h *tmrId); bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId); -void taosTmrCleanUp(void *handle); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 56d86c26a0b6d5f0c608ea6613c453161b597324..0715ee8066106244955feb0b14ac9deb3f808379 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -81,6 +81,8 @@ typedef struct { // rpc info int64_t reqId; SRpcHandleInfo rpcInfo; + tmr_h timerId; + int8_t tmrStopped; // exec int8_t inputStatus; int8_t execStatus; @@ -164,13 +166,12 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // tqOffset -STqOffsetStore* STqOffsetOpen(STqOffsetCfg*); -void STqOffsetClose(STqOffsetStore*); - -int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset); -int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetPersistAll(STqOffsetStore* pStore); +STqOffsetStore* tqOffsetOpen(STqOffsetCfg*); +void tqOffsetClose(STqOffsetStore*); +int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey); +int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset); +int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey); +int32_t tqOffsetPersistAll(STqOffsetStore* pStore); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 172caf8724d20f8825ff22bf3f86c848b77f727e..67651c2f780bc304e7f5f8783a4260d9c99fe8b0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -174,6 +174,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); + if (rsp.withSchema) { + ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum); + } rsp.rspOffset = fetchOffset; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 90f512611b1100bc79a6e85784ad87ebe10380c2..4d83a67579f89c24bde1c4724fdaacd1666bcfdd 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -30,7 +30,7 @@ struct STqOffsetStore { SHashObj* pHash; // SHashObj }; -STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) { +STqOffsetStore* tqOffsetOpen(STqOffsetCfg* pCfg) { STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore)); if (pStore == NULL) { return NULL; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f23a14472c1ac5c3f71e9500ad1fa42a9df6355e..2d9207a0de25c9b42e780a063b6b4b921f9cf8f0 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -15,6 +15,11 @@ #include "tq.h" +void tqTmrRspFunc(void* param, void* tmrId) { + STqHandle* pHandle = (STqHandle*)param; + atomic_store_8(&pHandle->pushHandle.tmrStopped, 1); +} + int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { // 1. guard and set status executing // 2. check processedVer @@ -50,12 +55,15 @@ int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) { return 0; } -void tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer) { +int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer, + int64_t timeout) { memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo)); atomic_store_64(&pHandle->pushHandle.reqId, reqId); atomic_store_64(&pHandle->pushHandle.processedVer, processedVer); atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL); - // set timeout timer + atomic_store_8(&pHandle->pushHandle.tmrStopped, 0); + taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId); + return 0; } int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index efae74b55a95525c105c7a8c3de3e887a0f3b2d2..d0aede145eb8640e1e9031160d5ab7573d4a74e8 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -69,6 +69,9 @@ int vnodeInit(int nthreads) { if (walInit() < 0) { return -1; } + if (tqInit() < 0) { + return -1; + } return 0; } @@ -94,6 +97,9 @@ void vnodeCleanup() { taosMemoryFreeClear(vnodeGlobal.threads); taosThreadCondDestroy(&(vnodeGlobal.hasTask)); taosThreadMutexDestroy(&(vnodeGlobal.mutex)); + + walCleanUp(); + tqCleanUp(); } int vnodeScheduleTask(int (*execute)(void*), void* arg) { @@ -155,4 +161,4 @@ static void* loop(void* arg) { } return NULL; -} \ No newline at end of file +} diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index e5d606ffffc6fc38e9fcd369f75f8c26da95b867..3d0204e355bd228836a2729cc9e52e74981c4e0f 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -1,10 +1,10 @@ -#include "qworker.h" #include "dataSinkMgt.h" #include "executor.h" #include "planner.h" #include "query.h" #include "qwInt.h" #include "qwMsg.h" +#include "qworker.h" #include "tcommon.h" #include "tmsg.h" #include "tname.h" @@ -406,7 +406,6 @@ int32_t qwDropTask(QW_FPARAMS_DEF) { return TSDB_CODE_SUCCESS; } - void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { int32_t paramIdx = 0; int32_t newParamIdx = 0; @@ -430,11 +429,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { *pParam = &gQwMgmt.param[paramIdx]; } - -void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { +void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; - + qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); if (dbFName[0] && tbName[0]) { @@ -444,7 +442,6 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { } } - void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { @@ -454,13 +451,13 @@ void qwCloseRef(void) { taosWUnLockLatch(&gQwMgmt.lock); } - void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); } void qwDestroyImpl(void *pMgmt) { SQWorker *mgmt = (SQWorker *)pMgmt; - taosTmrStopA(&mgmt->hbTimer); + taosTmrStop(mgmt->hbTimer); + mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); // TODO STOP ALL QUERY @@ -527,10 +524,10 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { switch (type) { case QUERY_QUEUE: pStat = &mgmt->stat.msgStat.waitTime[0]; - return pStat->num ? (pStat->total/pStat->num) : 0; + return pStat->num ? (pStat->total / pStat->num) : 0; case FETCH_QUEUE: pStat = &mgmt->stat.msgStat.waitTime[1]; - return pStat->num ? (pStat->total/pStat->num) : 0; + return pStat->num ? (pStat->total / pStat->num) : 0; default: qError("unsupported queue type %d", type); } @@ -538,5 +535,3 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { return -1; } - - diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index ee1f4185613dd85f0e60d86ebd0487b07b3ceee9..691a0d34d42ca3ab04be5daf61414016436a6bb1 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -23,19 +23,19 @@ #define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue. typedef struct { - char label[TSDB_LABEL_LEN]; - tsem_t emptySem; - tsem_t fullSem; + char label[TSDB_LABEL_LEN]; + tsem_t emptySem; + tsem_t fullSem; TdThreadMutex queueMutex; - int32_t fullSlot; - int32_t emptySlot; - int32_t queueSize; - int32_t numOfThreads; - TdThread *qthread; - SSchedMsg *queue; - bool stop; - void *pTmrCtrl; - void *pTimer; + int32_t fullSlot; + int32_t emptySlot; + int32_t queueSize; + int32_t numOfThreads; + TdThread *qthread; + SSchedMsg *queue; + bool stop; + void *pTmrCtrl; + void *pTimer; } SSchedQueue; static void *taosProcessSchedQueue(void *param); @@ -218,7 +218,8 @@ void taosCleanUpScheduler(void *param) { taosThreadMutexDestroy(&pSched->queueMutex); if (pSched->pTimer) { - taosTmrStopA(&pSched->pTimer); + taosTmrStop(pSched->pTimer); + pSched->pTimer = NULL; } if (pSched->queue) taosMemoryFree(pSched->queue);