diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b445aed244bd13a58cbd35bd17d3450c2ffc6a06..5719db5420d323959fff47f3ec66e713415047dd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -57,7 +57,8 @@ extern int32_t tMsgDict[]; #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_INFO(TYPE) \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ - (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \ + (TYPE) < TDMT_VND_TMQ_MAX_MSG || (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_VND_STREAM_MAX_MSG || \ + (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ : 0 #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index be4bf0e4d2b17b3a3f139e1a4c3212d7f367c9ab..9c47bc702ad830f36fd6ff41e1ddd8c6ef4eb6fe 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -138,17 +138,15 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) - TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_HB, "consumer-hb", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) @@ -186,21 +184,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) - TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) @@ -232,15 +215,30 @@ enum { TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) diff --git a/include/common/tname.h b/include/common/tname.h index 8c594a2b5ebbb8b9d71f54d01ac0d867ddb5c133..666a25303ebae0d9098c50e10d4a8c4df0e7624f 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -72,13 +72,13 @@ bool tNameTbNameEqual(SName* left, SName* right); typedef struct { // input - SArray* tags; // element is SSmlKv - const char* sTableName; // super table name - uint8_t sTableNameLen; // the length of super table name + SArray* tags; // element is SSmlKv + const char* stbFullName; // super table name + uint8_t stbFullNameLen; // the length of super table name // output - char* childTableName; // must have size of TSDB_TABLE_NAME_LEN; - uint64_t uid; // child table uid, may be useful + char* ctbShortName; // must have size of TSDB_TABLE_NAME_LEN; + uint64_t uid; // child table uid, may be useful } RandTableName; void buildChildTableName(RandTableName* rName); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 534d86b1f17bec768d9052355c73a788be85953e..e2616567f51fe82798c4e0fb428dde8bf0cb457a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -441,8 +441,9 @@ typedef struct { } SStreamRetrieveRsp; typedef struct { - int64_t streamId; - int32_t taskId; + SMsgHead msgHead; + int64_t streamId; + int32_t taskId; } SStreamRecoverStep1Req, SStreamRecoverStep2Req; typedef struct { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c18e1e3e40ea891c05a245bb7638bbdf891bd80c..ba639476d841be8ebe0e7114c2f8c662824079f1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -508,7 +508,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pMsgSendInfo->param = pParam; pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = tmqCommitCb; - pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; + pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; // send msg atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); @@ -750,7 +750,7 @@ void tmqSendHbReq(void* param, void* tmrId) { sendInfo->requestObjRefId = 0; sendInfo->param = NULL; sendInfo->fp = tmqHbCb; - sendInfo->msgType = TDMT_MND_MQ_HB; + sendInfo->msgType = TDMT_MND_TMQ_HB; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -1038,7 +1038,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { sendInfo->requestObjRefId = 0; sendInfo->param = ¶m; sendInfo->fp = tmqSubscribeCb; - sendInfo->msgType = TDMT_MND_SUBSCRIBE; + sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -1420,7 +1420,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqAskEpCb; - sendInfo->msgType = TDMT_MND_MQ_ASK_EP; + sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -1573,7 +1573,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqPollCb; - sendInfo->msgType = TDMT_VND_CONSUME; + sendInfo->msgType = TDMT_VND_TMQ_CONSUME; int64_t transporterId = 0; /*printf("send poll\n");*/ diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0c38d43543db0df063381174528d886551ee8d8f..502c2a6a08a9eebaf1ba9824f82a9768c96a6349 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2063,8 +2063,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB default: if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (colDataIsNull_s(pColInfoData, j)) { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NULL, NULL, false, offset, - k); + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NULL, NULL, false, + offset, k); } else if (pCol->type == pColInfoData->info.type) { tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset, k); @@ -2137,10 +2137,26 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB return TSDB_CODE_SUCCESS; } -char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { - ASSERT(stbName[0] != 0); +char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { + ASSERT(stbFullName[0] != 0); SArray* tags = taosArrayInit(0, sizeof(void*)); + if (tags == NULL) { + return NULL; + } + SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv)); + if (pTag == NULL) { + taosArrayDestroy(tags); + return NULL; + } + + void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); + if (cname == NULL) { + taosArrayDestroy(tags); + taosMemoryFree(pTag); + return NULL; + } + pTag->key = "group_id"; pTag->keyLen = strlen(pTag->key); pTag->type = TSDB_DATA_TYPE_UBIGINT; @@ -2148,13 +2164,11 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { pTag->length = sizeof(uint64_t); taosArrayPush(tags, &pTag); - void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); - RandTableName rname = { .tags = tags, - .sTableName = stbName, - .sTableNameLen = strlen(stbName), - .childTableName = cname, + .stbFullName = stbFullName, + .stbFullNameLen = strlen(stbFullName), + .ctbShortName = cname, }; buildChildTableName(&rname); @@ -2162,8 +2176,8 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { taosMemoryFree(pTag); taosArrayDestroy(tags); - ASSERT(rname.childTableName && rname.childTableName[0]); - return rname.childTableName; + ASSERT(rname.ctbShortName && rname.ctbShortName[0]); + return rname.ctbShortName; } void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { diff --git a/source/common/src/tname.c b/source/common/src/tname.c index 70682d6cd4efbf621668197d1b40da3f453c4c50..4d83b6e3d84bfd2ab123efd0720b90ebca18ce81 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -315,7 +315,7 @@ static int compareKv(const void* p1, const void* p2) { */ void buildChildTableName(RandTableName* rName) { SStringBuilder sb = {0}; - taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen); + taosStringBuilderAppendStringLen(&sb, rName->stbFullName, rName->stbFullNameLen); taosArraySort(rName->tags, compareKv); for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) { taosStringBuilderAppendChar(&sb, ','); @@ -336,11 +336,11 @@ void buildChildTableName(RandTableName* rName) { tMD5Final(&context); char temp[8] = {0}; - rName->childTableName[0] = 't'; - rName->childTableName[1] = '_'; + rName->ctbShortName[0] = 't'; + rName->ctbShortName[1] = '_'; for (int i = 0; i < 16; i++) { sprintf(temp, "%02x", context.digest[i]); - strcat(rName->childTableName, temp); + strcat(rName->ctbShortName, temp); } taosStringBuilderDestroy(&sb); rName->uid = *(uint64_t*)(context.digest); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 12e678886d8824aa0de2e255aee36a23c8bfe469..2c4a8e9b70afe929f381e04c8aa523cc383adf2b 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -141,15 +141,13 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_HB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_HB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -171,10 +169,10 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 1ce7f1a84b106953467a4c9ab132bce95a95448d..7ecb6fb208b1153e4495d425587dc840cddf7d5e 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -72,8 +72,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d682fa9cc58339e6e8d9a926a50e08d2cd9d42d2..e2812e0d312704633f4ec3519ddffaa98b867700 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -392,12 +392,12 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -409,10 +409,9 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index fa3651c7d7ace435ff1715f02308e58c9bb80414..aee1a7a75c49a5083e93e0b7d0743c0279a5e102 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -63,12 +63,12 @@ int32_t mndInitConsumer(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndConsumerActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_HB, mndProcessMqHbReq); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer); @@ -207,7 +207,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { pLostMsg->consumerId = pConsumer->consumerId; SRpcMsg pRpcMsg = { - .msgType = TDMT_MND_MQ_CONSUMER_LOST, + .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg), }; @@ -256,7 +256,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { mInfo("mq rebalance will be triggered"); SRpcMsg rpcMsg = { - .msgType = TDMT_MND_MQ_DO_REBALANCE, + .msgType = TDMT_MND_TMQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg), }; @@ -292,7 +292,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { pRecoverMsg->consumerId = consumerId; SRpcMsg pRpcMsg = { - .msgType = TDMT_MND_MQ_CONSUMER_RECOVER, + .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, .pCont = pRecoverMsg, .contLen = sizeof(SMqConsumerRecoverMsg), }; @@ -331,7 +331,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { pRecoverMsg->consumerId = consumerId; SRpcMsg pRpcMsg = { - .msgType = TDMT_MND_MQ_CONSUMER_RECOVER, + .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, .pCont = pRecoverMsg, .contLen = sizeof(SMqConsumerRecoverMsg), }; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 5ee4925a96d4712c54f9fe2e056f1b3e84c6ef24..7e7c6ee0b60242b1d2a34e17e332a2e571031c5b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -487,6 +487,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(taskOneLevel, pTask); // source diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3ff989e6707e840c7bea8a21471111b381df3831..9f433f33225ae09c594b0d97806e9295ef4c5347 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -557,78 +557,6 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { return 0; } -#if 0 -static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) { - SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pReq->streamId = pTask->streamId; - pReq->taskId = pTask->taskId; - int32_t len; - int32_t code; - tEncodeSize(tEncodeSMStreamTaskRecoverReq, pReq, len, code); - if (code != 0) { - return -1; - } - void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); - if (buf == NULL) { - return -1; - } - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, len); - tEncodeSMStreamTaskRecoverReq(&encoder, pReq); - ((SMsgHead *)buf)->vgId = pTask->nodeId; - - STransAction action = {0}; - memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + len; - action.msgType = TDMT_STREAM_TASK_RECOVER; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - return -1; - } - return 0; -} - -int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - if (pStream->isDistributed) { - int32_t lv = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < lv; i++) { - SArray *pTasks = taosArrayGetP(pStream->tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - SStreamTask *pTask = taosArrayGetP(pTasks, 0); - if (pTask->taskLevel == TASK_LEVEL__AGG) { - ASSERT(sz == 1); - if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) { - return -1; - } - } else { - continue; - } - } - } else { - int32_t lv = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < lv; i++) { - SArray *pTasks = taosArrayGetP(pStream->tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->taskLevel != TASK_LEVEL__SOURCE) break; - ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); - if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) { - return -1; - } - } - } - } - return 0; -} -#endif - int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t lv = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < lv; i++) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a6661e73c9f76796e4bd4849e09dcac3d23063d3..74f2b1288e9a296592fc7fec2162604031539056 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -73,11 +73,11 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSubActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); @@ -164,7 +164,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; action.contLen = tlen; - action.msgType = TDMT_VND_MQ_VG_CHANGE; + action.msgType = TDMT_VND_TMQ_SUBSCRIBE; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -920,7 +920,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) action.epSet = pVgEp->epSet; action.pCont = pReq; action.contLen = sizeof(SMqVDeleteReq); - action.msgType = TDMT_VND_MQ_VG_DELETE; + action.msgType = TDMT_VND_TMQ_DELETE_SUB; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 324404ce1bbefada315fef274b48940c4c9adcd4..e1ca1d2708e4e0d05ed2018d8f62500352518795 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -53,11 +53,10 @@ int32_t mndInitTopic(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTopicActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CREATE_TOPIC, mndProcessCreateTopicReq); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_TOPIC, mndProcessDropTopicReq); + mndSetMsgHandle(pMnode, TDMT_VND_TMQ_ADD_CHECKINFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DEL_CHECKINFO_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); @@ -506,7 +505,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = buf; action.contLen = sizeof(SMsgHead) + len; - action.msgType = TDMT_VND_ADD_CHECK_INFO; + action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); @@ -715,7 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = buf; action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; - action.msgType = TDMT_VND_DELETE_CHECK_INFO; + action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 8dbfc148417b6e23fca22023027db3b91cc89eef..7e6bd75b2d165256bdfe53b7ca7477356ba3d59a 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -90,12 +90,12 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_SCH_CANCEL_TASK: - //code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts); + // code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_SCH_DROP_TASK: code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts); break; - case TDMT_VND_CONSUME: + case TDMT_VND_TMQ_CONSUME: // code = tqProcessConsumeReq(pQnode->pTq, pMsg); // break; case TDMT_SCH_QUERY_HEARTBEAT: diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 719e2d8fad5fc595dea5554be07988f06330b58a..d34159d312e9eb6340bcdd4103e90b283ea0fcce 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -261,18 +261,52 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return 0; } +int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { + char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamRecoverFinishReq req; + + SDecoder decoder; + tDecoderInit(&decoder, msg, msgLen); + tDecodeSStreamRecoverFinishReq(&decoder, &req); + tDecoderClear(&decoder); + + // find task + SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, req.taskId); + if (pTask == NULL) { + return -1; + } + // do process request + if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { + return -1; + } + + return 0; +} + +int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) { + // + return 0; +} + int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: return sndProcessTaskRunReq(pSnode, pMsg); case TDMT_STREAM_TASK_DISPATCH: return sndProcessTaskDispatchReq(pSnode, pMsg, true); - case TDMT_STREAM_RETRIEVE: - return sndProcessTaskRetrieveReq(pSnode, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return sndProcessTaskDispatchRsp(pSnode, pMsg); + case TDMT_STREAM_RETRIEVE: + return sndProcessTaskRetrieveReq(pSnode, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); + case TDMT_STREAM_RECOVER_FINISH: + return sndProcessTaskRecoverFinishReq(pSnode, pMsg); + case TDMT_STREAM_RECOVER_FINISH_RSP: + return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a5dc4431ab711307834dc73affbe81718b6b641c..12b4ee413906457b36f60b6cbcbf4b2426738a7b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -179,8 +179,8 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); // tq-mq int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream @@ -190,11 +190,15 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); -int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); +// int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); +// int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2c01645389496450d632cd424e253510f511c940..22ce4b20f5f0c794ac9c1ec68b649aa6027ebf06 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -709,7 +709,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; taosWLockLatch(&pTq->pushLock); @@ -767,7 +767,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m return 0; } -int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock @@ -982,25 +982,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg // 3.go through recover steps to fill history if (pTask->fillHistory) { - streamSetParamForRecover(pTask); if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + streamSetParamForRecover(pTask); streamSourceRecoverPrepareStep1(pTask, version); SStreamRecoverStep1Req req; streamBuildSourceRecover1Req(pTask, &req); - - void* serialziedReq = (void*)&req; int32_t len = sizeof(SStreamRecoverStep1Req); + void* serializedReq = rpcMallocCont(len); + if (serializedReq == NULL) { + return -1; + } + + memcpy(serializedReq, &req, len); + SRpcMsg rpcMsg = { .contLen = len, - .pCont = serialziedReq, + .pCont = serializedReq, .msgType = TDMT_VND_STREAM_RECOVER_STEP1, }; - tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &rpcMsg); + if (tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &rpcMsg) < 0) { + /*ASSERT(0);*/ + } } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); } else if (pTask->taskLevel == TASK_LEVEL__SINK) { // do nothing @@ -1010,8 +1018,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg return 0; } -int32_t tqProcessTaskRecover1Req(STQ* pTq, char* msg, int32_t msgLen) { - int32_t code; +int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { + int32_t code; + char* msg = pMsg->pCont; + int32_t msgLen = pMsg->contLen; + SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg; SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { @@ -1035,16 +1046,24 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, char* msg, int32_t msgLen) { return -1; } + ASSERT(pReq->taskId == pTask->taskId); + // serialize msg - int32_t len = sizeof(SStreamRecoverStep2Req); - void* serializedReq = (void*)&req; + int32_t len = sizeof(SStreamRecoverStep1Req); + + void* serializedReq = rpcMallocCont(len); + if (serializedReq == NULL) { + return -1; + } + + memcpy(serializedReq, &req, len); // dispatch msg SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_STEP2, - .pCont = (void*)serializedReq, + .pCont = serializedReq, }; tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -1087,15 +1106,15 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m return 0; } -int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, char* msg, int32_t msgLen) { - int32_t code; +int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); // deserialize - int32_t len; SStreamRecoverFinishReq req; SDecoder decoder; - tDecoderInit(&decoder, msg, sizeof(SStreamRecoverFinishReq)); + tDecoderInit(&decoder, msg, msgLen); tDecodeSStreamRecoverFinishReq(&decoder, &req); tDecoderClear(&decoder); @@ -1112,6 +1131,11 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } +int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { + // + return 0; +} + int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { bool failed = false; SDecoder* pCoder = &(SDecoder){0}; @@ -1306,33 +1330,6 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { } } -#if 0 -int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { - SStreamTaskRecoverReq* pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); - if (pTask) { - streamProcessRecoverReq(pTask, pReq, pMsg); - return 0; - } else { - return -1; - } -} - -int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { - SStreamTaskRecoverRsp* pRsp = pMsg->pCont; - int32_t taskId = pRsp->rspTaskId; - - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); - if (pTask) { - streamProcessRecoverRsp(pTask, pRsp); - return 0; - } else { - return -1; - } -} -#endif - int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4579f0c7a4ad83bfc71f4af7d10611c397b0a7e4..9c827c9fa349f09eb41a6a8fcb6711b744a78aab 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -228,30 +228,30 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp if (vnodeProcessBatchDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; /* TQ */ - case TDMT_VND_MQ_VG_CHANGE: - if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + case TDMT_VND_TMQ_SUBSCRIBE: + if (tqProcessSubscribeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; - case TDMT_VND_MQ_VG_DELETE: - if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + case TDMT_VND_TMQ_DELETE_SUB: + if (tqProcessDeleteSubReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } break; - case TDMT_VND_MQ_COMMIT_OFFSET: + case TDMT_VND_TMQ_COMMIT_OFFSET: if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; - case TDMT_VND_ADD_CHECK_INFO: + case TDMT_VND_TMQ_ADD_CHECKINFO: if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; - case TDMT_VND_DELETE_CHECK_INFO: + case TDMT_VND_TMQ_DEL_CHECKINFO: if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; @@ -268,6 +268,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp goto _err; } } break; + case TDMT_VND_STREAM_RECOVER_STEP2: { + if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } + } break; case TDMT_VND_ALTER_CONFIRM: vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp); break; @@ -355,14 +360,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return 0; } - if (pMsg->msgType == TDMT_VND_CONSUME && !pVnode->restored) { + if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } - char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - switch (pMsg->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: @@ -381,7 +383,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_CONSUME: + case TDMT_VND_TMQ_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); @@ -389,16 +391,18 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_STREAM_TASK_DISPATCH: return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); #endif - /*case TDMT_STREAM_TASK_RECOVER:*/ - /*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/ - case TDMT_STREAM_RETRIEVE: - return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); - /*case TDMT_STREAM_TASK_RECOVER_RSP:*/ - /*return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);*/ + case TDMT_STREAM_RETRIEVE: + return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_RECOVER_STEP1: + return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); + case TDMT_STREAM_RECOVER_FINISH: + return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); + case TDMT_STREAM_RECOVER_FINISH_RSP: + return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index bffeb06f7cd53e089ad7717691f3336bdf42e107..3f9fe264c24532959ea78631de1e813cea26d152 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5279,7 +5279,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p code = buildCreateTopicReq(pCxt, pStmt, &createReq); } if (TSDB_CODE_SUCCESS == code) { - code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq); + code = buildCmdMsg(pCxt, TDMT_MND_TMQ_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq); } tFreeSCMCreateTopicReq(&createReq); return code; @@ -5293,7 +5293,7 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt tNameGetFullDbName(&name, dropReq.name); dropReq.igNotExists = pStmt->ignoreNotExists; - return buildCmdMsg(pCxt, TDMT_MND_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq); + return buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq); } static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pStmt) { @@ -5305,7 +5305,7 @@ static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pSt dropReq.igNotExists = pStmt->ignoreNotExists; strcpy(dropReq.cgroup, pStmt->cgroup); - return buildCmdMsg(pCxt, TDMT_MND_MQ_DROP_CGROUP, (FSerializeFunc)tSerializeSMDropCgroupReq, &dropReq); + return buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_CGROUP, (FSerializeFunc)tSerializeSMDropCgroupReq, &dropReq); } static int32_t translateAlterLocal(STranslateContext* pCxt, SAlterLocalStmt* pStmt) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 417ddfa80dbd5130d011f63533ac3d5bd4ed4da3..7cccda51a1d4c51ec52ff2d210aa13ce8a88c053 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -239,7 +239,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov msg.contLen = tlen + sizeof(SMsgHead); msg.pCont = buf; - msg.msgType = TDMT_VND_STREAM_RECOVER_FINISH; + msg.msgType = TDMT_STREAM_RECOVER_FINISH; tmsgSendReq(pEpSet, &msg); @@ -292,13 +292,19 @@ FAIL: int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int64_t groupId) { - char* ctbName; + char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); + if (ctbName == NULL) { + return -1; + } + if (pDataBlock->info.parTbName[0]) { - ctbName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); } else { - ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); + char* ctbShortName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, ctbShortName); + taosMemoryFree(ctbShortName); } + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 12de2fafc1ef86401f6e86fa0350410055365248..c979ba058be95ed55048ddabcedbccbaa4541242 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -36,6 +36,7 @@ int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) { } int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) { + pReq->msgHead.vgId = pTask->nodeId; pReq->streamId = pTask->streamId; pReq->taskId = pTask->taskId; return 0; @@ -48,6 +49,7 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { } int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) { + pReq->msgHead.vgId = pTask->nodeId; pReq->streamId = pTask->streamId; pReq->taskId = pTask->taskId; return 0;