diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cc15d4ed6b89e32f38862bc5a9955dc8f2502772..fa6f4b6c79d08783b640cb8ef8572c266e37b017 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2555,10 +2555,14 @@ typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; int64_t ntbUid; SArray* colIdList; // SArray -} SCheckAlterInfo; +} STqCheckInfo; -int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo); -int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo); +int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo); +int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo); + +typedef struct { + char topic[TSDB_TOPIC_FNAME_LEN]; +} STqDelCheckInfoReq; typedef struct { int32_t vgId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6462c7afbfa80a48219d64cbed3797317f705c16..b16df0e8857c1a6ebee01dee2f6eb4c31657ece3 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -188,7 +188,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) - TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eac92d76babdedc17ce69339e3bf769b24ed9c83..e6fcb021d55639c80158f12c191d772336057e1b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7dd3ce34c3d52495217e27663dd2c7c044c77c4d..533d924546eb553045fea97e6b50cdb42489d714 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp tDecoderClear(&decoder); return 0; } - int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) { if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; @@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe tEndDecode(decoder); return 0; } - int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { return 0; } -#if 1 int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { if (pVal->type == TMQ_OFFSET__RESET_NONE) { snprintf(buf, maxLen, "offset(reset to none)"); @@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } return 0; } -#endif bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { if (pLeft->type == pRight->type) { @@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { return 0; } -int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) { +int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) { if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1; int32_t sz = taosArrayGetSize(pInfo->colIdList); @@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) return pEncoder->pos; } -int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) { +int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) { if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1; if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1; int32_t sz; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 647af20fcf24e47b27b02d69595f8d1555a4cc19..ec761e6441ce651db5d6f3b034e4f11b1686fcdc 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7c6807ab87220b8cbaecddfd9b0278c0b13aa0fe..8eb3ed3901e7f9acdb43f0d7fb4592f4122de17c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 9f6108004db0809b70907e13ca32988e6d278a48..037a46345ffed6b1205292e513df1c2db9528b3b 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE #include "mndOffset.h" -#include "mndPrivilege.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndPrivilege.h" #include "mndShow.h" #include "mndStb.h" #include "mndTopic.h" @@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { sdbRelease(pSdb, pOffset); } - return code; + return code; } int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 820bb4b636bf5ecdebdb55ead48eb9c7ee5aeca5..ff208eae607ab0fa57be7431771f209e18e02ce5 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); @@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (topicObj.ntbUid != 0) { - SCheckAlterInfo info; + STqCheckInfo info; memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); info.ntbUid = topicObj.ntbUid; info.colIdList = topicObj.ntbColIds; @@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * // encoder check alter info int32_t len; int32_t code; - tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code); + tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); if (code < 0) { sdbRelease(pSdb, pVgroup); mndTransDrop(pTrans); @@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, len); - if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) { + if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { sdbRelease(pSdb, pVgroup); mndTransDrop(pTrans); return -1; @@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = buf; action.contLen = sizeof(SMsgHead) + len; - action.msgType = TDMT_VND_CHECK_ALTER_INFO; + action.msgType = TDMT_VND_ADD_CHECK_INFO; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); @@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); +#if 0 if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { ASSERT(0); mndTransDrop(pTrans); mndReleaseTopic(pMnode, pTopic); return -1; } +#endif // TODO check if rebalancing if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { @@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return -1; } + if (pTopic->ntbUid != 0) { + // broadcast to all vnode + void *pIter = NULL; + SVgObj *pVgroup = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } + + void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); + memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; + action.msgType = TDMT_VND_DELETE_CHECK_INFO; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + sdbRelease(pSdb, pVgroup); + mndTransDrop(pTrans); + return -1; + } + } + } + int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a1dba41c941f9baf8a28304b80eb12530e929e9d..cb5ec7aabe48363f57b68238be80a6c124af9509 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -117,16 +117,15 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; - SHashObj* pushMgr; // consumerId -> STqHandle* - SHashObj* handles; // subKey -> STqHandle - SHashObj* pAlterInfo; // topic -> SAlterCheckInfo + SHashObj* pPushMgr; // consumerId -> STqHandle* + SHashObj* pHandle; // subKey -> STqHandle + SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; - TDB* pMetaStore; + TDB* pMetaDB; TTB* pExecStore; - - TTB* pAlterInfoStore; + TTB* pCheckStore; SStreamMeta* pStreamMeta; }; @@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); int32_t tqMetaRestoreHandle(STQ* pTq); +int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); +int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); +int32_t tqMetaRestoreCheckInfo(STQ* pTq); typedef struct { int32_t size; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 47e18732e020149634cd7534a9178930f884c909..0c7a08a2b533331c63814c8469a2296aa4eb2f10 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -163,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); -int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver); +// tq-mq +int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); +// tq-stream +int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 112543e340cee6ba6f558a145c27653e0f5e5c70..a98fea19887b66926a7469b12c6d94f97853f051 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->path = strdup(path); pTq->pVnode = pVnode; - pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); if (tqMetaOpen(pTq) < 0) { ASSERT(0); @@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { void tqClose(STQ* pTq) { if (pTq) { tqOffsetClose(pTq->pOffsetStore); - taosHashCleanup(pTq->handles); - taosHashCleanup(pTq->pushMgr); - taosHashCleanup(pTq->pAlterInfo); + taosHashCleanup(pTq->pHandle); + taosHashCleanup(pTq->pPushMgr); + taosHashCleanup(pTq->pCheckInfo); taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); @@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) { +static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { + return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && + pLeft->val.version <= pRight->val.version; +} + +int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { STqOffset offset = {0}; SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); @@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve } else if (offset.val.type == TMQ_OFFSET__LOG) { tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, TD_VID(pTq->pVnode), offset.val.version); + if (offset.val.version + 1 == version) { + offset.val.version += 1; + } } else { ASSERT(0); } - /*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/ - /*if (pOffset != NULL) {*/ - /*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/ + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); + if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) { + return 0; + } + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { ASSERT(0); return -1; } if (offset.val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) { ASSERT(0); @@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve } } + // rsp + /*}*/ /*}*/ @@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pTq->pAlterInfo, pIter); + pIter = taosHashIterate(pTq->pCheckInfo, pIter); if (pIter == NULL) break; - SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter; + STqCheckInfo* pCheck = (STqCheckInfo*)pIter; if (pCheck->ntbUid == tbUid) { int32_t sz = taosArrayGetSize(pCheck->colIdList); for (int32_t i = 0; i < sz; i++) { int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); if (forbidColId == colId) { - taosHashCancelIterate(pTq->pAlterInfo, pIter); + taosHashCancelIterate(pTq->pCheckInfo, pIter); return -1; } } @@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SWalCkHead* pCkHead = NULL; // 1.find handle - STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/ if (pHandle == NULL) { tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, @@ -478,10 +490,10 @@ OVER: return code; } -int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); + int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); ASSERT(code == 0); tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); @@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) { - SCheckAlterInfo info = {0}; - SDecoder decoder; +int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { + STqCheckInfo info = {0}; + SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); - if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) { + if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } tDecoderClear(&decoder); - if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) { + if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } return 0; } -int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { + if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tqMetaDeleteCheckInfo(pTq, msg) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + +int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock - STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey, @@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); } - taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); + taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { // TODO @@ -668,34 +696,9 @@ FAIL: return code; } -int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { // - return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen); -#if 0 - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeSStreamTask(&decoder, pTask) < 0) { - ASSERT(0); - goto FAIL; - } - tDecoderClear(&decoder); - - if (tqExpandTask(pTq, pTask) < 0) { - goto FAIL; - } - - taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); - - return 0; - -FAIL: - if (pTask) taosMemoryFree(pTask); - return -1; -#endif + return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen); } int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { @@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } } -int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 5709ad7c85ff142bbe43cce6f5e70c6953d72459..405bc669bd23c27b2b234d2b60be4ef6def8bc80 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -43,86 +43,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { return 0; } -int32_t tqMetaRestoreHandle(STQ* pTq) { - TBC* pCur = NULL; - if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { +int32_t tqMetaOpen(STQ* pTq) { + if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) { ASSERT(0); return -1; } - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; - SDecoder decoder; + if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) { + ASSERT(0); + return -1; + } - tdbTbcMoveToFirst(pCur); + if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) { + ASSERT(0); + return -1; + } - while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - STqHandle handle; - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, &handle); + if (tqMetaRestoreHandle(pTq) < 0) { + return -1; + } - handle.pRef = walOpenRef(pTq->pVnode->pWal); - if (handle.pRef == NULL) { - ASSERT(0); - return -1; - } - walRefVer(handle.pRef, handle.snapshotVer); + if (tqMetaRestoreCheckInfo(pTq) < 0) { + return -1; + } - if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - SReadHandle reader = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = handle.snapshotVer, - }; + return 0; +} - handle.execHandle.execCol.task = qCreateQueueExecTaskInfo( - handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); - ASSERT(handle.execHandle.execCol.task); - void* scanner = NULL; - qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); - ASSERT(scanner); - handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); - ASSERT(handle.execHandle.pExecReader); - } else { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - handle.execHandle.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); - taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle)); +int32_t tqMetaClose(STQ* pTq) { + if (pTq->pExecStore) { + tdbTbClose(pTq->pExecStore); } - - tdbTbcClose(pCur); + if (pTq->pCheckStore) { + tdbTbClose(pTq->pCheckStore); + } + tdbClose(pTq->pMetaDB); return 0; } -int32_t tqMetaOpen(STQ* pTq) { - if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { - ASSERT(0); +int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) { + TXN txn; + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { - ASSERT(0); + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { return -1; } - if (tqMetaRestoreHandle(pTq) < 0) { + if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) { + return -1; + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { return -1; } return 0; } -int32_t tqMetaClose(STQ* pTq) { - if (pTq->pExecStore) { - tdbTbClose(pTq->pExecStore); +int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) { + /*ASSERT(0);*/ + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + return 0; +} + +int32_t tqMetaRestoreCheckInfo(STQ* pTq) { + TBC* pCur = NULL; + if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) { + ASSERT(0); + return -1; + } + + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + SDecoder decoder; + + tdbTbcMoveToFirst(pCur); + + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + STqCheckInfo info; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tDecoderClear(&decoder); + if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } } - tdbClose(pTq->pMetaStore); + tdbTbcClose(pCur); return 0; } @@ -153,7 +183,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ASSERT(0); } - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { ASSERT(0); } @@ -161,7 +191,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ASSERT(0); } - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { ASSERT(0); } @@ -177,7 +207,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { ASSERT(0); } - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { ASSERT(0); } @@ -185,9 +215,67 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { /*ASSERT(0);*/ } - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { ASSERT(0); } return 0; } + +int32_t tqMetaRestoreHandle(STQ* pTq) { + TBC* pCur = NULL; + if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { + ASSERT(0); + return -1; + } + + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + SDecoder decoder; + + tdbTbcMoveToFirst(pCur); + + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + STqHandle handle; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, &handle); + + handle.pRef = walOpenRef(pTq->pVnode->pWal); + if (handle.pRef == NULL) { + ASSERT(0); + return -1; + } + walRefVer(handle.pRef, handle.snapshotVer); + + if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + SReadHandle reader = { + .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initTableReader = true, + .initTqReader = true, + .version = handle.snapshotVer, + }; + + handle.execHandle.execCol.task = qCreateQueueExecTaskInfo( + handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); + ASSERT(handle.execHandle.execCol.task); + void* scanner = NULL; + qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); + ASSERT(scanner); + handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); + ASSERT(handle.execHandle.pExecReader); + } else { + handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + handle.execHandle.execDb.pFilterOutTbUid = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + } + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); + taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); + } + + tdbTbcClose(pCur); + return 0; +} + diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5d7814a045733994b823114dbd4dce6de151cb28..b8803b20fc21e68b2af71c1d80e2475ef06aae63 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pTq->handles, pIter); + pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) break; STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index b4a7ce7737e5f599dac051f33e275918e7709c0d..c52e0e2c098478c0fc055306ad2eda9ced09dfea 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { - ASSERT(0); + tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); if (code) goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ded357274694cf2e294bf928f7c26d47f96d3c30..54a5aacbd2737fddccb1b5c640e396f2da1142f8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; /* TQ */ case TDMT_VND_MQ_VG_CHANGE: - if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_VND_MQ_VG_DELETE: - if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } break; case TDMT_VND_MQ_COMMIT_OFFSET: - if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead), version) < 0) { + if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; - case TDMT_VND_CHECK_ALTER_INFO: - if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + case TDMT_VND_ADD_CHECK_INFO: + if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + goto _err; + } + break; + case TDMT_VND_DELETE_CHECK_INFO: + if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_STREAM_TASK_DEPLOY: { - if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_DROP: { - if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b74e8386280a9a3a90da87dd12d37806a588cfa5..64a9537e6c43df3d5748e05908bb725c759e0631 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) { taosMemoryFree(pMeta); } -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) { +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1;