diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 162b6fb2ed24539435dc3e4573b52b0a9759a5a4..8f300c96c5e3e97c5920511641ea90461746990f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -42,7 +42,7 @@ typedef struct SReadHandle { #define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 typedef enum { - OPTR_EXEC_MODEL_BATCH = 0x1, + OPTR_EXEC_MODEL_BATCH = 0x1, OPTR_EXEC_MODEL_STREAM = 0x2, } EOPTR_EXEC_MODEL; @@ -81,7 +81,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO * @param isAdd * @return */ -int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd); +int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd); /** * Create the exec task object according to task json @@ -169,7 +169,7 @@ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes); #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ecfc991331e398c1d3eccef80d7d479c779c8fd6..2a2fdea5378fe7cee6e47f90eb8a613f80b313f9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -830,10 +830,12 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } // init hb timer - tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + if (tmq->hbTimer == NULL) { + tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + } // init auto commit timer - if (tmq->autoCommit) { + if (tmq->autoCommit && tmq->commitTimer == NULL) { tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer); } @@ -1456,9 +1458,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - tmq_list_t* lst = tmq_list_new(); - tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } + + tmq_list_t* lst = tmq_list_new(); + rsp = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { // TODO: free resources return TMQ_RESP_ERR__SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 503c0f404a6fe5296ec5728a7dc9993255af7c77..274876567e50f86d47acadbddda15edbdca087af 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -627,21 +627,26 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); - SArray *tmp = pOldConsumer->rebNewTopics; - pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; - pNewConsumer->rebNewTopics = tmp; - tmp = pOldConsumer->rebRemovedTopics; - pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; - pNewConsumer->rebRemovedTopics = tmp; + if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS__READY; + } else { + SArray *tmp = pOldConsumer->rebNewTopics; + pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; + pNewConsumer->rebNewTopics = tmp; - tmp = pOldConsumer->assignedTopics; - pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; - pNewConsumer->assignedTopics = tmp; + tmp = pOldConsumer->rebRemovedTopics; + pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; + pNewConsumer->rebRemovedTopics = tmp; - pOldConsumer->subscribeTime = pNewConsumer->upTime; + tmp = pOldConsumer->assignedTopics; + pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; + pNewConsumer->assignedTopics = tmp; - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pOldConsumer->subscribeTime = pNewConsumer->upTime; + + pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + } } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9727d9df9f74edbb53299ef7798a95612efcd713..6a5ad63822a9d14601bb7e4a8a95b39222d1e6b4 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -96,8 +96,8 @@ STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy) SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid); SArray* metaGetSmaTbUids(SMeta* pMeta); -int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); -int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); +int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); +int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); @@ -117,6 +117,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6cc986d54bfc5856c6d439eb03078307a1a70ef2..80aa350e4def046867242a4b444d075a42e57da2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -101,6 +101,21 @@ static void tdSRowDemo() { taosMemoryFree(pTSChema); } +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { + void* pIter = NULL; + STqExec* pExec = NULL; + while (1) { + pIter = taosHashIterate(pTq->execs, pIter); + if (pIter == NULL) break; + pExec = (STqExec*)pIter; + for (int32_t i = 0; i < 5; i++) { + int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true); + ASSERT(code == 0); + } + } + return 0; +} + int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType != TDMT_VND_SUBMIT) return 0; void* pIter = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 464331319e893a8115c122a9901fd79f5e8a2673..5c8cd362fd9397b2796c15ce93a4f85cda380f17 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -333,6 +333,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, SVCreateTbRsp cRsp = {0}; char tbName[TSDB_TABLE_FNAME_LEN]; STbUidStore *pStore = NULL; + SArray *tbUids = NULL; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -348,7 +349,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, } rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp)); - if (rsp.pArray == NULL) { + tbUids = taosArrayInit(req.nReqs, sizeof(int64_t)); + if (rsp.pArray == NULL || tbUids == NULL) { rcode = -1; terrno = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -376,6 +378,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, } else { cRsp.code = TSDB_CODE_SUCCESS; tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); + taosArrayPush(tbUids, &pCreateReq->uid); } taosArrayPush(rsp.pArray, &cRsp); @@ -383,6 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, tDecoderClear(&decoder); + tqUpdateTbUidList(pVnode->pTq, tbUids); tdUpdateTbUidList(pVnode->pSma, pStore); tdUidStoreFree(pStore); @@ -402,6 +406,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, _exit: taosArrayDestroy(rsp.pArray); + taosArrayDestroy(tbUids); tDecoderClear(&decoder); tEncoderClear(&encoder); return rcode; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 320450eb6e5eb8703b82c399e69a6f02088c5101..66073b70ebf0ace2ade2ef9bd11bae7b5908c555 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -126,7 +126,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { return pTaskInfo; } -int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { +int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; // traverse to the stream scanner node to add this table id @@ -141,12 +141,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA SMetaReader mr = {0}; metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); - for(int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); int32_t code = metaGetTableEntryByUid(&mr, *id); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:%"PRIu64" code:%s", *id, tstrerror(terrno)); + qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); continue; } @@ -160,7 +160,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA metaReaderClear(&mr); - qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa)); + qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); if (code != TSDB_CODE_SUCCESS) { return code;