diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 8c1d957381583f1e91a5c1620b90638ea3061ce1..1049599a561723d006b928e20280c08660316b6f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,13 +29,13 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); +typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef struct { - void *handle; + void* handle; bool localExec; localFetchFp fp; - SArray *explainRes; + SArray* explainRes; } SLocalFetch; typedef struct { @@ -51,9 +51,9 @@ typedef struct { bool initTqReader; int32_t numOfVgroups; - void* sContext; // SSnapContext* + void* sContext; // SSnapContext* - void* pStateBackend; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg @@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** @@ -195,6 +195,8 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); +int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq); + int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6ba10641f5e7054034f2008f1b317ec8121286c6..f0fb8d4b021da7ace48bf513ff2aabb5d3e920ba 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -217,7 +217,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); -int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); +int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c3441a43f0e736881fc8bc491dd5717223645ed4..04b081344558ac0ca7c6990e46ef2859c1795f89 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -113,10 +113,20 @@ typedef struct { } STqHandle; +typedef struct { + SMqDataRsp dataRsp; + SMqRspHead rspHead; + STqHandle* pHandle; + SRpcHandleInfo pInfo; +} STqPushEntry; + struct STQ { - SVnode* pVnode; - char* path; - SHashObj* pPushMgr; // consumerId -> STqHandle* + SVnode* pVnode; + char* path; + + SRWLatch pushLock; + + SHashObj* pPushMgr; // consumerId -> STqPushEntry SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo @@ -146,7 +156,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); +int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 76f3c1b12d7e8e4ae27b38d3a3907ae933d7d904..21136405cb327afd765f2ad522ef617f430adc4f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -78,7 +78,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + taosInitRWLatch(&pTq->pushLock); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pTq->pPushMgr, taosMemoryFree); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -153,6 +155,65 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, return 0; } +int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { + SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); + + ASSERT(!pRsp->withSchema); + ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + + if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { + if (pRsp->blockNum > 0) { + ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + } else { + ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + } + } + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); + + if (code < 0) { + return -1; + } + + int32_t tlen = sizeof(SMqRspHead) + len; + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead)); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + tEncodeSMqDataRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rsp = { + .info = pPushEntry->pInfo, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; + + tmsgSendRsp(&rsp); + + char buf1[80] = {0}; + char buf2[80] = {0}; + tFormatOffset(buf1, 80, &pRsp->reqOffset); + tFormatOffset(buf2, 80, &pRsp->rspOffset); + tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", + TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + + return 0; +} + int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); @@ -477,11 +538,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + // lock + taosWLockLatch(&pTq->pushLock); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 - + if (dataRsp.blockNum == 0) { + STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); + if (pPushEntry != NULL) { + pPushEntry->pHandle = pHandle; + pPushEntry->pInfo = pMsg->info; + memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); + pPushEntry->rspHead.consumerId = consumerId; + pPushEntry->rspHead.epoch = reqEpoch; + pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, + TD_VID(pTq->pVnode)); + // unlock + taosWUnLockLatch(&pTq->pushLock); + return 0; + } + } + taosWUnLockLatch(&pTq->pushLock); #endif + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } @@ -615,9 +696,14 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); - ASSERT(code == 0); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + } - tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); + code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such offset", pReq->subKey); + } if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { ASSERT(0); @@ -756,7 +842,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe atomic_add_fetch_32(&pHandle->epoch, 1); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { // TODO + ASSERT(0); } + // close handle } return 0; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a24f92023584710e642d931131d9f707a08c52da..58d051bec1aaa1d6d88309eb4ef17f2a9c736d4e 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -15,7 +15,7 @@ #include "tq.h" -static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -243,7 +243,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); + int32_t schemaLen = htonl(pBlk->schemaLen); if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); @@ -278,7 +278,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); + int32_t schemaLen = htonl(pBlk->schemaLen); if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index a57e8174fee9f82fd35c425e9214e48fba91f709..c42cfeb7b879d02bdc82ea558371d99ab6b17085 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,6 +213,80 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { + if (msgType == TDMT_VND_SUBMIT) { + // lock push mgr to avoid potential msg lost + taosWLockLatch(&pTq->pushLock); + if (taosHashGetSize(pTq->pPushMgr) != 0) { + SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); + SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); + void* data = taosMemoryMalloc(msgLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to copy data for stream since out of memory"); + return -1; + } + memcpy(data, msg, msgLen); + SSubmitReq* pReq = (SSubmitReq*)data; + pReq->version = ver; + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pPushMgr, pIter); + if (pIter == NULL) break; + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; + STqExecHandle* pExec = &pPushEntry->pHandle->execHandle; + qTaskInfo_t task = pExec->task; + SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + // prepare scan mem data + qStreamScanMemData(task, pReq); + + // exec + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(task, NULL, &ts) < 0) { + ASSERT(0); + } + + if (pDataBlock == NULL) { + break; + } + + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + pRsp->blockNum++; + } + if (pRsp->blockNum > 0) { + // set offset + tqOffsetResetToLog(&pRsp->rspOffset, ver); + // remove from hash + size_t kLen; + void* key = taosHashGetKey(pPushEntry, &kLen); + void* keyCopy = taosMemoryMalloc(kLen); + memcpy(keyCopy, key, kLen); + + taosArrayPush(cachedKeys, &keyCopy); + taosArrayPush(cachedKeyLens, &kLen); + + if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { + ASSERT(0); + } + tqPushDataRsp(pTq, pPushEntry); + } + } + // delete entry + for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { + void* key = taosArrayGetP(cachedKeys, i); + size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); + taosHashRemove(pTq->pPushMgr, key, kLen); + } + taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); + taosArrayDestroy(cachedKeyLens); + } + // unlock + taosWUnLockLatch(&pTq->pushLock); + } + if (vnodeIsRoleLeader(pTq->pVnode)) { if (msgType == TDMT_VND_SUBMIT) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8e5f328efa14454f9d89de3f8e77bcb8a368dbd9..d93a8fe0301c78a85b40bc938e2fdb5a9e406a19 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -15,21 +15,20 @@ #include "tq.h" - -bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ - if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){ +bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) { return true; } - int16_t msgType = pHead->msgType; - char* body = pHead->body; - int32_t bodyLen = pHead->bodyLen; + int16_t msgType = pHead->msgType; + char* body = pHead->body; + int32_t bodyLen = pHead->bodyLen; - int64_t tbSuid = pHandle->execHandle.execTb.suid; - int64_t realTbSuid = 0; - SDecoder coder; - void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); - int32_t len = bodyLen - sizeof(SMsgHead); + int64_t tbSuid = pHandle->execHandle.execTb.suid; + int64_t realTbSuid = 0; + SDecoder coder; + void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); + int32_t len = bodyLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) { @@ -43,38 +42,38 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ if (tDecodeSVDropStbReq(&coder, &req) < 0) { goto end; } - realTbSuid = req.suid; + realTbSuid = req.suid; } else if (msgType == TDMT_VND_CREATE_TABLE) { SVCreateTbBatchReq req = {0}; if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { goto end; } - int32_t needRebuild = 0; + int32_t needRebuild = 0; SVCreateTbReq* pCreateReq = NULL; for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; - if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) { needRebuild++; } } - if(needRebuild == 0){ + if (needRebuild == 0) { // do nothing - }else if(needRebuild == req.nReqs){ + } else if (needRebuild == req.nReqs) { realTbSuid = tbSuid; - }else{ + } else { realTbSuid = tbSuid; SVCreateTbBatchReq reqNew = {0}; reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; - if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) { reqNew.nReqs++; taosArrayPush(reqNew.pArray, pCreateReq); } } - int tlen; + int tlen; int32_t ret = 0; tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret); void* buf = taosMemoryMalloc(tlen); @@ -107,7 +106,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ } } } else if (msgType == TDMT_VND_ALTER_TABLE) { - SVAlterTbReq req = {0}; + SVAlterTbReq req = {0}; if (tDecodeSVAlterTbReq(&coder, &req) < 0) { goto end; @@ -129,32 +128,32 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ goto end; } - int32_t needRebuild = 0; + int32_t needRebuild = 0; SVDropTbReq* pDropReq = NULL; for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; - if(pDropReq->suid == tbSuid){ + if (pDropReq->suid == tbSuid) { needRebuild++; } } - if(needRebuild == 0){ + if (needRebuild == 0) { // do nothing - }else if(needRebuild == req.nReqs){ + } else if (needRebuild == req.nReqs) { realTbSuid = tbSuid; - }else{ + } else { realTbSuid = tbSuid; SVDropTbBatchReq reqNew = {0}; reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; - if(pDropReq->suid == tbSuid){ + if (pDropReq->suid == tbSuid) { reqNew.nReqs++; taosArrayPush(reqNew.pArray, pDropReq); } } - int tlen; + int tlen; int32_t ret = 0; tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret); void* buf = taosMemoryMalloc(tlen); @@ -177,11 +176,11 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ goto end; } realTbSuid = req.suid; - } else{ + } else { ASSERT(0); } - end: +end: tDecoderClear(&coder); return tbSuid == realTbSuid; } @@ -224,7 +223,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = -1; goto END; } - if(isValValidForTable(pHandle, pHead)){ + if (isValValidForTable(pHandle, pHead)) { *fetchOffset = offset; code = 0; goto END; @@ -241,7 +240,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea offset++; } } - END: +END: taosThreadMutexUnlock(&pHandle->pWalReader->mutex); return code; } @@ -348,7 +347,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { } } -int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) { +int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) { pReader->pMsg = pMsg; if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6e9eba306a68ad91c22e5adb16e8ce5048b23903..b62a4dfbdc8e4a207c673466768b8149c0d80ee1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -808,7 +808,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; SSubmitBlk *pBlock; - SSubmitRsp rsp = {0}; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; int32_t nRows; @@ -921,7 +920,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } if (taosArrayGetSize(newTbUids) > 0) { - vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), + (int32_t)taosArrayGetSize(newTbUids)); } tqUpdateTbUidList(pVnode->pTq, newTbUids, true); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dc98b0547957ab910afb6b4d465c020cfbd60799..f47855c2d0366cd7aefc63346a88a6ec410e08c3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -145,6 +145,7 @@ typedef struct { SMqMetaRsp metaRsp; // for tmq fetching meta int8_t returned; int64_t snapshotVer; + const SSubmitReq* pReq; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 373cb451f4f73fc0d2f6245f9ea9af06617ea59f..4db9338bd9267f554a735941ee109cf6b960c519 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -486,7 +486,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL if (pLocal) { memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); } - + taosArrayClearEx(pResList, freeBlock); int64_t curOwner = 0; @@ -773,6 +773,14 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s return TSDB_CODE_SUCCESS; } +int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); + ASSERT(pTaskInfo->streamInfo.pReq == NULL); + pTaskInfo->streamInfo.pReq = pReq; + return 0; +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c5c33ae29e034add9bb07cea6f98c66dbeec58b..5f54d56ca7bea482569fdc9a83cd24eab020e255 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1430,6 +1430,41 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SStreamScanInfo* pInfo = pOperator->info; qDebug("queue scan called"); + + if (pTaskInfo->streamInfo.pReq != NULL) { + if (pInfo->tqReader->pMsg == NULL) { + pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq; + const SSubmitReq* pSubmit = pInfo->tqReader->pMsg; + if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { + qError("submit msg messed up when initing stream submit block %p", pSubmit); + pInfo->tqReader->pMsg = NULL; + ASSERT(0); + } + } + + blockDataCleanup(pInfo->pRes); + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + + while (tqNextDataBlock(pInfo->tqReader)) { + SSDataBlock block = {0}; + + int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); + + if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { + continue; + } + + setBlockIntoRes(pInfo, &block); + + if (pBlockInfo->rows > 0) { + return pInfo->pRes; + } + } + + pInfo->tqReader->pMsg = NULL; + pTaskInfo->streamInfo.pReq = NULL; + } + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) {