From b46098793d13e867ae9ff559c107dbe01fce39d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Feb 2023 12:16:45 +0800 Subject: [PATCH] refactor: do some internal refactor and add some logs for tmq. --- include/os/osMemory.h | 3 + source/dnode/mnode/impl/src/mndConsumer.c | 69 ++++++++++------------ source/dnode/mnode/impl/src/mndSubscribe.c | 8 --- source/dnode/vnode/src/tq/tq.c | 6 +- source/dnode/vnode/src/tq/tqExec.c | 4 +- source/dnode/vnode/src/tq/tqMeta.c | 2 +- source/dnode/vnode/src/tq/tqPush.c | 22 ++++--- source/dnode/vnode/src/tq/tqRead.c | 45 ++++++++------ source/dnode/vnode/src/vnd/vnodeSvr.c | 10 ++-- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/executor.c | 6 +- source/libs/executor/src/scanoperator.c | 3 +- 12 files changed, 92 insertions(+), 89 deletions(-) diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 3f57c72933..cd2d4a3494 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -29,7 +29,10 @@ extern "C" { #define calloc CALLOC_FUNC_TAOS_FORBID #define realloc REALLOC_FUNC_TAOS_FORBID #define free FREE_FUNC_TAOS_FORBID +#ifdef strdup +#undef strdup #define strdup STRDUP_FUNC_TAOS_FORBID +#endif #endif // ifndef ALLOW_FORBID_FUNC #endif // if !defined(WINDOWS) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6621b6a3f9..768fe45e3e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -15,17 +15,11 @@ #define _DEFAULT_SOURCE #include "mndConsumer.h" -#include "mndDb.h" -#include "mndDnode.h" -#include "mndMnode.h" #include "mndPrivilege.h" #include "mndShow.h" -#include "mndStb.h" #include "mndSubscribe.h" #include "mndTopic.h" #include "mndTrans.h" -#include "mndUser.h" -#include "mndVgroup.h" #include "tcompare.h" #include "tname.h" @@ -209,6 +203,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { taosMemoryFree(pConsumerNew); mndTransDrop(pTrans); return 0; + FAIL: tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); @@ -580,6 +575,10 @@ static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const ch return 0; } +static void* topicNameDup(void* p){ + return taosStrdup((char*) p); +} + int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; @@ -616,16 +615,16 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); + + // set the update type pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); + + // all subscribed topics should re-balance. taosArrayDestroy(pConsumerNew->rebNewTopics); - pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance. + pConsumerNew->rebNewTopics = pTopicList; subscribe.topicNames = NULL; - for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i)); - taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); - } - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; @@ -646,17 +645,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } + // set the update type pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i)); - taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); - } - - int32_t oldTopicNum = 0; - if (pExistedConsumer->currentTopics) { - oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); - } + int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0; int32_t i = 0, j = 0; while (i < oldTopicNum || j < newTopicNum) { @@ -692,11 +685,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } } - if (pExistedConsumer && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && - taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { - /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ - /*pConsumerNew->updateType = */ - /*}*/ + // no topics need to be rebalanced + if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { goto _over; } @@ -718,8 +708,9 @@ _over: tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } + // TODO: replace with destroy subscribe msg - if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); + taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); return code; } @@ -750,12 +741,12 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { CM_ENCODE_OVER: taosMemoryFreeClear(buf); if (terrno != 0) { - mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); + mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); sdbFreeRaw(pRaw); return NULL; } - mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer); + mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer); return pRaw; } @@ -823,8 +814,8 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { } static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId, - mndConsumerStatusName(pConsumer->status)); + mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, + pConsumer->status, mndConsumerStatusName(pConsumer->status)); tDeleteSMqConsumerObj(pConsumer); return 0; } @@ -1075,22 +1066,23 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * // consumer group char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN); - varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + STR_TO_VARSTR(cgroup, pConsumer->cgroup); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); // client id char clientId[256 + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(clientId), pConsumer->clientId, 256); - varDataSetLen(clientId, strlen(varDataVal(clientId))); + STR_TO_VARSTR(clientId, pConsumer->clientId); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false); // status char status[20 + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20); - varDataSetLen(status, strlen(varDataVal(status))); + const char* pStatusName = mndConsumerStatusName(pConsumer->status); + STR_TO_VARSTR(status, pStatusName); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)status, false); @@ -1123,8 +1115,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * numOfRows++; } + taosRUnLockLatch(&pConsumer->lock); sdbRelease(pSdb, pConsumer); + + pBlock->info.rows = numOfRows; } pShow->numOfRows += numOfRows; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8d8fd1b9b5..8544994c3e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -16,15 +16,10 @@ #define _DEFAULT_SOURCE #include "mndSubscribe.h" #include "mndConsumer.h" -#include "mndDb.h" -#include "mndDnode.h" -#include "mndMnode.h" #include "mndScheduler.h" #include "mndShow.h" -#include "mndStb.h" #include "mndTopic.h" #include "mndTrans.h" -#include "mndUser.h" #include "mndVgroup.h" #include "tcompare.h" #include "tname.h" @@ -1041,7 +1036,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock } // do not show for cleared subscription -#if 1 int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); @@ -1087,8 +1081,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock numOfRows++; } -#endif - pBlock->info.rows = numOfRows; taosRUnLockLatch(&pSub->lock); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c9bbb30380..c465617975 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -217,7 +217,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -604,7 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index e92da7668a..5638228641 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -113,9 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs return -1; } - ASSERT(pRsp->withTbName == false); - ASSERT(pRsp->withSchema == false); - + ASSERT(!(pRsp->withTbName || pRsp->withSchema)); return 0; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 12fd15c63a..a85e6e0a70 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -197,7 +197,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { return -1; } - tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey, + tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index e747262277..5a25d7e894 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); taosWUnLockLatch(&pHandle->pushHandle.lock); - tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, + tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); @@ -210,25 +210,30 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); int32_t len = msgLen - sizeof(SSubmitReq2Msg); - tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, - TMSG_INFO(msgType), msg, pReq, len); - if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); - if (taosHashGetSize(pTq->pPushMgr) != 0) { - tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); + int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); + if (numOfRegisteredPush > 0) { + tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", + pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); + SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); + void* data = taosMemoryMalloc(len); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to copy data for stream since out of memory"); taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); taosArrayDestroy(cachedKeyLens); + + // unlock + taosWUnLockLatch(&pTq->pushLock); return -1; } + memcpy(data, pReq, len); void* pIter = NULL; @@ -262,7 +267,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) }; qStreamSetScanMemData(task, submit); - // exec + // here start to scan submit block to extract the subscribed data while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -278,7 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) pRsp->blockNum++; } - tqDebug("vgId:%d, tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey, + tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey, pRsp->blockNum); if (pRsp->blockNum > 0) { // set offset @@ -295,6 +300,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqPushDataRsp(pTq, pPushEntry); } } + // delete entry for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { void* key = taosArrayGetP(cachedKeys, i); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2c8f20d8cd..4bf6320c32 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -292,9 +292,12 @@ void tqCloseReader(STqReader* pReader) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + tqError("tmq poll: wal reader failed to seek to ver:%"PRId64, ver); return -1; + } else { + tqDebug("tmq poll: wal reader seek to ver:%"PRId64, ver); + return 0; } - return 0; } int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { @@ -302,28 +305,33 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (!fromProcessedMsg) { - if (walNextValidMsg(pReader->pWalReader) < 0) { - pReader->ver = - pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped); + SWalReader* pWalReader = pReader->pWalReader; + + if (walNextValidMsg(pWalReader) < 0) { + pReader->ver = pWalReader->curVersion - (pWalReader->curInvalid | pWalReader->curStopped); ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); return -1; } - void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pReader->pWalReader->pHead->head.version; + + void* body = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + int64_t ver = pWalReader->pHead->head.version; + + tqDebug("tmq poll: extract submit msg from wal, version:%"PRId64" len:%d", ver, bodyLen); + #if 0 - if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { + if (pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { // TODO do filter ret->fetchType = FETCH_TYPE__META; - ret->meta = pReader->pWalReader->pHead->head.body; + ret->meta = pWalReader->pHead->head.body; return 0; } else { #endif tqReaderSetSubmitReq2(pReader, body, bodyLen, ver); - /*tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);*/ + /*tqReaderSetDataMsg(pReader, body, pWalReader->pHead->head.version);*/ #if 0 } #endif @@ -358,7 +366,7 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v // if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; // while (true) { // if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1; -// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen, +// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen, // pReader->msgIter.len, pReader->msgIter.uid); // if (pReader->pBlock == NULL) break; // } @@ -371,10 +379,8 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v #endif int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { - ASSERT(pReader->msg2.msgStr == NULL); - ASSERT(msgStr); - ASSERT(msgLen); - ASSERT(ver >= 0); + ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0)); + pReader->msg2.msgStr = msgStr; pReader->msg2.msgLen = msgLen; pReader->msg2.ver = ver; @@ -421,7 +427,10 @@ bool tqNextDataBlock(STqReader* pReader) { #endif bool tqNextDataBlock2(STqReader* pReader) { - if (pReader->msg2.msgStr == NULL) return false; + if (pReader->msg2.msgStr == NULL) { + return false; + } + ASSERT(pReader->setMsg == 1); tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, @@ -528,7 +537,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) if (pReader->pSchema == NULL) { tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion); + pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -538,7 +547,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer); + pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8651478afa..a3438b611e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1264,8 +1264,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq goto _exit; } - for (int32_t i = 1; i < nColData; i++) { - if (aColData[i].nVal != aColData[0].nVal) { + for (int32_t j = 1; j < nColData; j++) { + if (aColData[j].nVal != aColData[0].nVal) { code = TSDB_CODE_INVALID_MSG; goto _exit; } @@ -1299,8 +1299,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1); // create table - if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == - 0) { // create table success + if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) { + // create table success if (newTbUids == NULL && (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) { @@ -1330,7 +1330,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pSubmitRsp->affectedRows += affectedRows; } - // update table uid list + // update the affected table uid list if (taosArrayGetSize(newTbUids) > 0) { vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1712cba0f5..5df3b14a5b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -133,8 +133,7 @@ typedef struct { int64_t snapshotVer; // const SSubmitReq* pReq; - SPackedData submit; - + SPackedData submit; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; int8_t recoverStep; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 347ac369d8..b3d3e8aab7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1035,8 +1035,9 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); - ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL); + ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL)); + qDebug("set the submit block for future scan"); + pTaskInfo->streamInfo.submit = submit; return 0; } @@ -1050,6 +1051,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { return 0; } + if (subType == TOPIC_SUB_TYPE__COLUMN) { uint16_t type = pOperator->operatorType; pOperator->status = OP_OPENED; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a238b84993..e1594b13a8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1562,7 +1562,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; - qDebug("queue scan called"); + qDebug("start to exec queue scan"); if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) { @@ -1587,7 +1587,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SSDataBlock block = {0}; int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); - if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; } -- GitLab