From 29fcd1b83aec652621cb27d2cb4af008ad187d44 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 10:08:05 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 7 +- source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/tq/tq.c | 85 ++------ source/dnode/vnode/src/tq/tqPush.c | 258 +----------------------- source/dnode/vnode/src/tq/tqRead.c | 53 ++--- source/dnode/vnode/src/tq/tqScan.c | 7 +- source/dnode/vnode/src/tq/tqUtil.c | 9 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 1 - source/libs/executor/inc/querytask.h | 2 +- source/libs/executor/src/executor.c | 1 + source/libs/executor/src/scanoperator.c | 9 +- source/libs/stream/src/streamExec.c | 3 +- source/libs/wal/src/walRead.c | 1 + 13 files changed, 57 insertions(+), 380 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c7424cd233..88460cd3ca 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -255,14 +255,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock); int32_t tqNextBlockInWal(STqReader* pReader); -int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); +bool tqNextBlockImpl(STqReader *pReader); +int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); -bool tqNextBlockImpl(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); +int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index eb2787595b..1aea479511 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -212,7 +212,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 12b81b6c3f..4997db684f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1069,12 +1069,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); taosWLockLatch(&pTq->lock); - if(taosHashGetSize(pTq->pPushMgr) > 0){ - void *pIter = taosHashIterate(pTq->pPushMgr, NULL); - while(pIter){ + + if (taosHashGetSize(pTq->pPushMgr) > 0) { + void* pIter = taosHashIterate(pTq->pPushMgr, NULL); + + while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId); - if(ASSERT(pHandle->msg != NULL)){ + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + + if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); break; }else{ @@ -1083,77 +1086,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { taosMemoryFree(pHandle->msg); pHandle->msg = NULL; } + pIter = taosHashIterate(pTq->pPushMgr, pIter); } + taosHashClear(pTq->pPushMgr); } + // unlock taosWUnLockLatch(&pTq->lock); - - return 0; -} - -int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { -#if 0 - void* pIter = NULL; - SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT); - if (pSubmit == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to create data submit for stream since out of memory"); - saveOffsetForAllTasks(pTq, submit.ver); - return -1; - } - - SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES); - - while (1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->status.taskStatus); - continue; - } - - // check if offset value exists - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - if (tInputQueueIsFull(pTask)) { - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - - int64_t ver = submit.ver; - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver); - } else { - ver = pOffset->val.version; - } - - tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver); - taosArrayPush(pInputQueueFullTasks, &pTask); - continue; - } - - // check if offset value exists - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - ASSERT(pOffset == NULL); - - addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); - } - - streamDataSubmitDestroy(pSubmit); - taosFreeQitem(pSubmit); -#endif - - tqStartStreamTasks(pTq); return 0; } @@ -1323,9 +1264,9 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } int32_t tqStartStreamTasks(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - + int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + taosWLockLatch(&pMeta->lock); int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); if (numOfTasks == 0) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index a914517645..c8195f72a9 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -16,250 +16,10 @@ #include "tq.h" #include "vnd.h" -#if 0 -void tqTmrRspFunc(void* param, void* tmrId) { - STqHandle* pHandle = (STqHandle*)param; - atomic_store_8(&pHandle->pushHandle.tmrStopped, 1); -} - -static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) { - SStreamDataSubmit* pSubmit = *ppSubmit; - while (pSubmit != NULL) { - if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { - } - // update processed - atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - if (pRsp->blockNum > 0) { - *ppSubmit = pSubmit; - return 0; - } else { - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - } - } - *ppSubmit = pSubmit; - return -1; -} - -int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { - SMqDataRsp rsp = {0}; - // 1. guard and set status executing - int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE, - TASK_EXEC_STATUS__EXECUTING); - if (execStatus == TASK_EXEC_STATUS__IDLE) { - SStreamDataSubmit* pSubmit = NULL; - // 2. check processedVer - // 2.1. if not missed, get msg from queue - // 2.2. if missed, scan wal - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - while (pHandle->pushHandle.processedVer <= pSubmit->ver) { - // read from wal - } - while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - if (pSubmit == NULL) break; - } - // 3. exec, after each success, update processed ver - // first run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status closing - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING); - // second run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - } -SEND_RSP: - // 4. if get result - // 4.1 set exec input status blocked and exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - // 4.2 rpc send - rsp.rspOffset = pHandle->pushHandle.processedVer; - /*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/ - /*return -1;*/ - /*}*/ - // 4.3 clear rpc info - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - return 0; -} - -int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) { - memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle)); - pHandle->pushHandle.inputQ.queue = taosOpenQueue(); - pHandle->pushHandle.inputQ.qall = taosAllocateQall(); - if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) { - if (pHandle->pushHandle.inputQ.queue) { - taosCloseQueue(pHandle->pushHandle.inputQ.queue); - } - if (pHandle->pushHandle.inputQ.qall) { - taosFreeQall(pHandle->pushHandle.inputQ.qall); - } - return -1; - } - return 0; -} - -int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer, - int64_t timeout) { - memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo)); - atomic_store_64(&pHandle->pushHandle.reqId, reqId); - atomic_store_64(&pHandle->pushHandle.processedVer, processedVer); - atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL); - atomic_store_8(&pHandle->pushHandle.tmrStopped, 0); - taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId); - return 0; -} - -int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { - int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus); - if (inputStatus == TASK_INPUT_STATUS__NORMAL) { - SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit); - if (pSubmitClone == NULL) { - return -1; - } - taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone); - return 0; - } - return -1; -} - -int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) { - // - return 0; -} - -int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { - if (msgType != TDMT_VND_SUBMIT) return 0; - void* pIter = NULL; - STqHandle* pHandle = NULL; - SSubmitReq* pReq = (SSubmitReq*)msg; - int32_t workerId = 4; - int64_t fetchOffset = ver; - - while (1) { - pIter = taosHashIterate(pTq->pushMgr, pIter); - if (pIter == NULL) break; - pHandle = *(STqHandle**)pIter; - - taosWLockLatch(&pHandle->pushHandle.lock); - - SMqDataRsp rsp = {0}; - rsp.reqOffset = pHandle->pushHandle.reqOffset; - rsp.blockData = taosArrayInit(0, sizeof(void*)); - rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); - - if (msgType == TDMT_VND_SUBMIT) { - tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); - } else { - tqError("tq push unexpected msg type %d", msgType); - } - - if (rsp.blockNum == 0) { - taosWUnLockLatch(&pHandle->pushHandle.lock); - continue; - } - - rsp.rspOffset = fetchOffset; - - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - // todo free - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch; - ((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqDataBlkRsp(&abuf, &rsp); - - SRpcMsg resp = { - .info = pHandle->pushHandle.rpcInfo, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - tmsgSendRsp(&resp); - - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - taosWUnLockLatch(&pHandle->pushHandle.lock); - - tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64, - TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, - rsp.reqOffset, rsp.rspOffset); - - // TODO destroy - taosArrayDestroy(rsp.blockData); - taosArrayDestroy(rsp.blockDataLen); - } - - return 0; -} -#endif - int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { -// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); -// int32_t len = msgLen - sizeof(SSubmitReq2Msg); -// int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { tqProcessSubmitReqForSubscribe(pTq); - // lock push mgr to avoid potential msg lost -// taosWLockLatch(&pTq->lock); -// -// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); -// if (numOfRegisteredPush > 0) { -// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", -// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); -// -// void* data = taosMemoryMalloc(len); -// if (data == NULL) { -// terrno = TSDB_CODE_OUT_OF_MEMORY; -// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); -// taosWUnLockLatch(&pTq->lock); -// return -1; -// } -// -// memcpy(data, pReq, len); -// -// SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); -// void* pIter = NULL; -// -// while (1) { -// pIter = taosHashIterate(pTq->pPushMgr, pIter); -// if (pIter == NULL) { -// break; -// } -// -// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; -// -// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); -// if (pHandle == NULL) { -// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, -// pPushEntry->subKey); -// continue; -// } -// -// STqExecHandle* pExec = &pHandle->execHandle; -// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); -// } -// -// doRemovePushedEntry(cachedKey, pTq); -// taosArrayDestroyEx(cachedKey, freeItem); -// taosMemoryFree(data); -// } -// -// // unlock -// taosWUnLockLatch(&pTq->lock); } tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); @@ -274,8 +34,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v } if (msgType == TDMT_VND_SUBMIT) { - SPackedData submit = {0}; - tqProcessSubmitReq(pTq, submit); + tqStartStreamTasks(pTq); } if (msgType == TDMT_VND_DELETE) { @@ -286,16 +45,16 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } - int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - STqHandle* pHandle = (STqHandle*) handle; - if(pHandle->msg == NULL){ + STqHandle* pHandle = (STqHandle*)handle; + + if (pHandle->msg == NULL) { pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); - }else{ - void *tmp = pHandle->msg->pCont; + } else { + void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; } @@ -303,7 +62,8 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } @@ -313,6 +73,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + if(pHandle->msg != NULL) { tqPushDataRsp(pTq, pHandle); @@ -320,5 +81,6 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { taosMemoryFree(pHandle->msg); pHandle->msg = NULL; } + return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 082e31ea91..7ed77edd5b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -332,6 +332,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { // try next message in wal file + // todo always retry to avoid read failure caused by wal file deletion if (walNextValidMsg(pWalReader) < 0) { return FETCH_TYPE__NONE; } @@ -374,7 +375,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); + int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { return FETCH_TYPE__DATA; } @@ -384,7 +385,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { if (ret != NULL) { tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver); - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); + int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { return FETCH_TYPE__DATA; } @@ -399,31 +400,6 @@ int32_t tqNextBlockInWal(STqReader* pReader) { } } -int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) { - while (1) { - if (pReader->msg.msgStr == NULL) { - if (walNextValidMsg(pReader->pWalReader) < 0) { - return FETCH_TYPE__NONE; - } - - void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pReader->pWalReader->pHead->head.version; - - tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); - } - - while (tqNextBlockImpl(pReader)) { - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); - if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) { - continue; - } - - return FETCH_TYPE__DATA; - } - } -} - int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { pReader->msg.msgStr = msgStr; pReader->msg.msgLen = msgLen; @@ -527,7 +503,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } -int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { +int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); @@ -535,6 +511,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa *pSubmitTbDataRet = pSubmitTbData; } + SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); int32_t sversion = pSubmitTbData->sver; @@ -603,7 +580,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { - goto FAIL; + return -1; } i++; j++; @@ -622,7 +599,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + return -1; } pBlock->info.rows = numOfRows; @@ -638,7 +615,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa while (targetIdx < colActual) { if (sourceIdx >= numOfCols) { tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); - goto FAIL; + return -1; } SColData* pCol = taosArrayGet(pCols, sourceIdx); @@ -647,7 +624,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa if (pCol->nVal != numOfRows) { tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); - goto FAIL; + return -1; } if (pCol->cid < pColData->info.colId) { @@ -661,14 +638,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } else { colDataSetNULL(pColData, i); } } else { if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } } @@ -710,14 +687,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } else { colDataSetNULL(pColData, i); } } else { if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } @@ -735,10 +712,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa } return 0; - -FAIL: - blockDataFreeRes(pBlock); - return -1; } int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 3d9cea54ba..800bcc8b71 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -66,9 +66,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const int32_t MAX_ROWS_TO_RETURN = 4096; - int32_t vgId = TD_VID(pTq->pVnode); - int32_t code = 0; - int32_t totalRows = 0; + + int32_t vgId = TD_VID(pTq->pVnode); + int32_t code = 0; + int32_t totalRows = 0; const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 133c51a8dc..d83345ad59 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -175,7 +175,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } -// till now, all data has been transferred to consumer, new data needs to push client once arrived. + // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { // lock @@ -361,11 +361,10 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); + } else { // todo handle the case where re-balance occurs. + // for taosx + return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } - - // todo handle the case where re-balance occurs. - // for taosx - return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f8161427db..c608403456 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -448,7 +448,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { -// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 8852265da0..37c93fef5c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -59,7 +59,7 @@ typedef struct { STqOffsetVal currentOffset; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta int64_t snapshotVer; - SPackedData submit; // todo remove it +// SPackedData submit; // todo remove it SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5fc079b7c1..2d5830e4a9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1080,6 +1080,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { + // todo refactor: move away tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9b1b5235cc..7cb3c00c1a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1636,6 +1636,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("start to exec queue scan, %s", id); +#if 0 if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg.msgStr == NULL) { SPackedData submit = pTaskInfo->streamInfo.submit; @@ -1649,7 +1650,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -1665,6 +1666,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.submit = (SPackedData){0}; return NULL; } +#endif if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); @@ -1682,10 +1684,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { return NULL; } + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer); } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { + while (1) { int32_t type = tqNextBlockInWal(pInfo->tqReader); SSDataBlock* pRes = pInfo->tqReader->pResBlock; @@ -2071,7 +2075,7 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pInfo->pRes); while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -2109,7 +2113,6 @@ FETCH_NEXT_BLOCK: // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - // printDataBlock(pInfo->pRes, "stream scan"); qDebug("scan rows: %" PRId64, pBlockInfo->rows); if (pBlockInfo->rows > 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f33e126068..f79d84c371 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -253,7 +253,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { -// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr); break; } @@ -298,7 +297,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize); streamTaskExecImpl(pTask, pInput, pRes); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 6154e30938..4cc43a19a0 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -237,6 +237,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { } seeked = true; } + while (1) { contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); if (contLen == sizeof(SWalCkHead)) { -- GitLab