From 7616a283e3f1376bc9350399fbac55c649d893b2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 18 Mar 2023 18:23:38 +0800 Subject: [PATCH] fix:error in TD-23218 & remove useless logic --- include/common/tcommon.h | 1 - include/libs/executor/executor.h | 4 +- include/libs/wal/wal.h | 4 +- include/util/taoserror.h | 1 + source/dnode/vnode/inc/vnode.h | 12 +- source/dnode/vnode/src/tq/tq.c | 183 +++++++++- source/dnode/vnode/src/tq/tqExec.c | 63 +--- source/dnode/vnode/src/tq/tqRead.c | 432 ++---------------------- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executor.c | 18 +- source/libs/executor/src/executorimpl.c | 9 + source/libs/executor/src/scanoperator.c | 98 ++---- source/libs/wal/src/walRead.c | 17 +- source/util/src/terror.c | 1 + 14 files changed, 283 insertions(+), 566 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2a40976a8b..2614864b16 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -216,7 +216,6 @@ typedef struct SSDataBlock { enum { FETCH_TYPE__DATA = 1, - FETCH_TYPE__META, FETCH_TYPE__SEP, FETCH_TYPE__NONE, }; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index c3d2010351..dcc3c86171 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -196,12 +196,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit); -int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); +void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); -int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo); - const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 014ed518a3..ccbc53fa5d 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -146,8 +146,8 @@ typedef struct { int64_t curFileFirstVer; int64_t curVersion; int64_t capacity; - int8_t curInvalid; - int8_t curStopped; +// int8_t curInvalid; +// int8_t curStopped; TdThreadMutex mutex; SWalFilterCond cond; // TODO remove it diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3dc3aa69b7..87eb3b3deb 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -756,6 +756,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002) +#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 1d14829891..f3d983ae46 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -230,12 +230,7 @@ typedef struct SSnapContext { } SSnapContext; typedef struct STqReader { - // const SSubmitReq *pMsg; - // SSubmitBlk *pBlock; - // SSubmitMsgIter msgIter; - // SSubmitBlkIter blkIter; - - int64_t ver; +// int64_t ver; SPackedData msg2; int8_t setMsg; @@ -264,8 +259,13 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); +<<<<<<< Updated upstream int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char* id); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); +======= +int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); +void tqNextBlock(STqReader *pReader, SFetchRet *ret); +>>>>>>> Stashed changes int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cd67dc23ad..334508eecf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -211,6 +211,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { return 0; } +<<<<<<< Updated upstream int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { int32_t len = 0; int32_t code = 0; @@ -242,6 +243,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con .code = 0, }; tmsgSendRsp(&rsp); +======= +int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { + SMqDataRsp* pRsp = pPushEntry->pDataRsp; + SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; + doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); +>>>>>>> Stashed changes char buf1[80] = {0}; char buf2[80] = {0}; @@ -253,6 +260,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } +<<<<<<< Updated upstream int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { int32_t len = 0; int32_t code = 0; @@ -284,6 +292,10 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co .code = 0, }; tmsgSendRsp(&rsp); +======= +int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) { + doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type); +>>>>>>> Stashed changes char buf1[80] = {0}; char buf2[80] = {0}; @@ -435,6 +447,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } +<<<<<<< Updated upstream // update epoch if need int32_t savedEpoch = atomic_load_32(&pHandle->epoch); while (savedEpoch < reqEpoch) { @@ -552,16 +565,62 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // for taosx +======= +static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { + int32_t vgId = TD_VID(pTq->pVnode); + + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); + + // lock + taosWLockLatch(&pTq->pushLock); + int code = tqScanData(pTq, pHandle, &dataRsp, offset); + if(code != 0) { + tDeleteSMqDataRsp(&dataRsp); + return TSDB_CODE_TMQ_CONSUMER_ERROR; + } + // till now, all data has been transferred to consumer, new data needs to push client once arrived. + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { + code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + taosWUnLockLatch(&pTq->pushLock); + return code; + } + + taosWUnLockLatch(&pTq->pushLock); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); + + // NOTE: this pHandle->consumerId may have been changed already. + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 + ", ts:%" PRId64, + pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, + dataRsp.rspOffset.ts); + + tDeleteSMqDataRsp(&dataRsp); + return code; +} + +static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { + int code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + SWalCkHead* pCkHead = NULL; +>>>>>>> Stashed changes SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, &req); +<<<<<<< Updated upstream if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) { +======= + if (offset->type != TMQ_OFFSET__LOG) { + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { +>>>>>>> Stashed changes return -1; } if (metaRsp.metaRspLen > 0) { +<<<<<<< Updated upstream if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { code = -1; } @@ -569,17 +628,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ",version:%" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); +======= + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 + ",ts:%" PRId64, + pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, + metaRsp.rspOffset.ts); +>>>>>>> Stashed changes taosMemoryFree(metaRsp.metaRsp); tDeleteSTaosxRsp(&taosxRsp); return code; } + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 + ",version:%" PRId64, + pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, + taosxRsp.rspOffset.version); if (taosxRsp.blockNum > 0) { if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { code = -1; } tDeleteSTaosxRsp(&taosxRsp); return code; +<<<<<<< Updated upstream } else { fetchOffsetNew = taosxRsp.rspOffset; } @@ -592,6 +663,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { int64_t fetchVer = fetchOffsetNew.version + 1; +======= + } + } else { + int64_t fetchVer = offset->version + 1; +>>>>>>> Stashed changes pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { tDeleteSTaosxRsp(&taosxRsp); @@ -601,28 +677,45 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { walSetReaderCapacity(pHandle->pWalReader, 2048); while (1) { +<<<<<<< Updated upstream savedEpoch = atomic_load_32(&pHandle->epoch); if (savedEpoch > reqEpoch) { tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch); +======= + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > pRequest->epoch) { + tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 + ", found new consumer epoch %d, discard req epoch %d", + pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); +>>>>>>> Stashed changes break; } if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); +<<<<<<< Updated upstream if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { code = -1; } +======= + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); +>>>>>>> Stashed changes tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; } SWalCont* pHead = &pCkHead->head; +<<<<<<< Updated upstream tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); +======= + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", pRequest->consumerId, + pRequest->epoch, vgId, fetchVer, pHead->msgType); +>>>>>>> Stashed changes if (pHead->msgType == TDMT_VND_SUBMIT) { SPackedData submit = { @@ -631,8 +724,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { .ver = pHead->version, }; if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { +<<<<<<< Updated upstream tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode), req.subKey); +======= + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, + pRequest->subKey); +>>>>>>> Stashed changes return -1; } if (taosxRsp.blockNum > 0 /* threshold */) { @@ -648,8 +746,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } } else { - /*A(pHandle->fetchMeta);*/ - /*A(IS_META_MSG(pHead->msgType));*/ tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); metaRsp.resMsgType = pHead->msgType; @@ -674,6 +770,89 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } +<<<<<<< Updated upstream +======= +static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { + int32_t code = -1; + STqOffsetVal offset = {0}; + STqOffsetVal reqOffset = pRequest->reqOffset; + + // 1. reset the offset if needed + if (reqOffset.type > 0) { + offset = reqOffset; + } else { // handle the reset offset cases, according to the consumer's choice. + bool blockReturned = false; + code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); + if (code != 0) { + return code; + } + + // empty block returned, quit + if (blockReturned) { + return 0; + } + } + + // this is a normal subscription requirement + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + return processSubColumn(pTq, pHandle, pRequest, pMsg, &offset); + } + + // todo handle the case where re-balance occurs. + // for taosx + return processSubDbOrTable(pTq, pHandle, pRequest, pMsg, &offset); +} + +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { + SMqPollReq req = {0}; + if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int64_t consumerId = req.consumerId; + int32_t reqEpoch = req.epoch; + STqOffsetVal reqOffset = req.reqOffset; + int32_t vgId = TD_VID(pTq->pVnode); + + // 1. find handle + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle == NULL) { + tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + // 2. check re-balance status + taosRLockLatch(&pTq->pushLock); + if (pHandle->consumerId != consumerId) { + tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); + terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + taosRUnLockLatch(&pTq->pushLock); + return -1; + } + taosRUnLockLatch(&pTq->pushLock); + + taosWLockLatch(&pTq->pushLock); + // 3. update the epoch value + int32_t savedEpoch = pHandle->epoch; + if (savedEpoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); + pHandle->epoch = reqEpoch; + } + taosWUnLockLatch(&pTq->pushLock); + + char buf[80]; + tFormatOffset(buf, 80, &reqOffset); + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, + consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); + + return extractDataForMq(pTq, pHandle, &req, pMsg); +} + +>>>>>>> Stashed changes int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index f7eba3fbc1..36f4d448c8 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -65,18 +65,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs qTaskInfo_t task = pExec->task; if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); - if (pOffset->type == TMQ_OFFSET__LOG) { - pRsp->rspOffset = *pOffset; - return 0; - } else { - tqOffsetResetToLog(pOffset, pHandle->snapshotVer); - if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); - pRsp->rspOffset = *pOffset; - return 0; - } - } + tqError("prepare scan failed, return"); + return -1; } int32_t rowCnt = 0; @@ -103,20 +93,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } } - if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - return -1; - } - - if (pRsp->rspOffset.type == 0) { - tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, - pRsp->rspOffset.uid, pRsp->rspOffset.version); - return -1; - } - - if(pRsp->withTbName || pRsp->withSchema){ - tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema); - return -1; - } + qStreamExtractOffset(task, &pRsp->rspOffset); return 0; } @@ -125,18 +102,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta qTaskInfo_t task = pExec->task; if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); - if (pOffset->type == TMQ_OFFSET__LOG) { - pRsp->rspOffset = *pOffset; - return 0; - } else { - tqOffsetResetToLog(pOffset, pHandle->snapshotVer); - if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); - pRsp->rspOffset = *pOffset; - return 0; - } - } + tqDebug("tqScanTaosx prepare scan failed, return"); + return -1; } int32_t rowCnt = 0; @@ -183,9 +150,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - if (qStreamExtractPrepareUid(task) != 0) { - continue; - } tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); break; @@ -198,28 +162,17 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts); - qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; tqDebug("tmqsnap task exec change to get data"); continue; } *pMetaRsp = *tmp; tqDebug("tmqsnap task exec exited, get meta"); - - tqDebug("task exec exited"); break; } qStreamExtractOffset(task, &pRsp->rspOffset); - if (pRsp->rspOffset.type == 0) { - tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, - pRsp->rspOffset.uid, pRsp->rspOffset.version); - return -1; - } - return 0; } @@ -232,14 +185,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pExecReader; - /*tqReaderSetDataMsg(pReader, pReq, 0);*/ tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); while (tqNextDataBlock2(pReader)) { - /*SSDataBlock block = {0};*/ - /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ - /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ - /*}*/ - taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bf73cca925..14623730e1 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -260,7 +260,7 @@ STqReader* tqOpenReader(SVnode* pVnode) { pReader->pVnodeMeta = pVnode->pMeta; /*pReader->pMsg = NULL;*/ - pReader->ver = -1; +// pReader->ver = -1; pReader->pColIdList = NULL; pReader->cachedSchemaVer = 0; pReader->cachedSchemaSuid = 0; @@ -291,35 +291,24 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { - // todo set the correct vgId - tqDebug("tmq poll: wal seek to version:%"PRId64" %s", ver, id); if (walReadSeekVer(pReader->pWalReader, ver) < 0) { tqError("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id); return -1; - } else { - tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id); - return 0; } + tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id); + return 0; } -int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { - bool fromProcessedMsg = pReader->msg2.msgStr != NULL; - +void tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { - if (!fromProcessedMsg) { + if (pReader->msg2.msgStr == NULL) { if (walNextValidMsg(pReader->pWalReader) < 0) { -// pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped; - if(pReader->pWalReader->curInvalid == 0){ - pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped; - }else{ - pReader->ver = walGetLastVer(pReader->pWalReader->pWal); - } +// pReader->ver = pReader->pWalReader->curVersion; ret->offset.type = TMQ_OFFSET__LOG; - - ret->offset.version = pReader->ver; + ret->offset.version = pReader->pWalReader->curVersion; ret->fetchType = FETCH_TYPE__NONE; - tqDebug("return offset %" PRId64 ", no more valid msg in wal", ret->offset.version); - return -1; + tqInfo("return offset %" PRId64 ", no more valid msg in wal", ret->offset.version); + return; } void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); @@ -330,7 +319,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { } while (tqNextDataBlock2(pReader)) { - // TODO mem free memset(&ret->data, 0, sizeof(SSDataBlock)); int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL); if (code != 0 || ret->data.info.rows == 0) { @@ -338,45 +326,18 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { } ret->fetchType = FETCH_TYPE__DATA; tqDebug("return data rows %d", ret->data.info.rows); - return 0; - } - - if (fromProcessedMsg) { - ret->offset.type = TMQ_OFFSET__LOG; - ret->offset.version = pReader->ver; - ret->fetchType = FETCH_TYPE__SEP; - tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version); - return 0; + return; } } } -#if 0 -int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) { - pReader->pMsg = pMsg; - -// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; -// while (true) { -// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1; -// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen, -// pReader->msgIter.len, pReader->msgIter.uid); -// if (pReader->pBlock == NULL) break; -// } - - if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; - pReader->ver = ver; - memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter)); - return 0; -} -#endif - int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { - ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0)); +// ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0)); pReader->msg2.msgStr = msgStr; pReader->msg2.msgLen = msgLen; pReader->msg2.ver = ver; - pReader->ver = ver; +// pReader->ver = ver; tqDebug("tq reader set msg %p %d", msgStr, msgLen); @@ -384,7 +345,9 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, SDecoder decoder; tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen); if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) { - ASSERT(0); + tDecoderClear(&decoder); + tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%"PRId64, msgLen, ver); + return -1; } tDecoderClear(&decoder); pReader->setMsg = 1; @@ -392,47 +355,17 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, return 0; } -#if 0 -bool tqNextDataBlock(STqReader* pReader) { - if (pReader->pMsg == NULL) return false; - while (1) { - if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) { - return false; - } - if (pReader->pBlock == NULL) { - pReader->pMsg = NULL; - return false; - } - - if (pReader->tbIdHash == NULL) { - return true; - } - void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t)); - /*tqDebug("search uid %" PRId64, pHandle->msgIter.uid);*/ - if (ret != NULL) { - /*tqDebug("find uid %" PRId64, pHandle->msgIter.uid);*/ - return true; - } - } - return false; -} -#endif - bool tqNextDataBlock2(STqReader* pReader) { - if (pReader->msg2.msgStr == NULL) { + if (pReader->msg2.msgStr == NULL || pReader->setMsg != 1) { return false; } - ASSERT(pReader->setMsg == 1); - tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, pReader->msg2.ver, pReader->nextBlk); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - ASSERT(pSubmitTbData->uid); - if (pReader->tbIdHash == NULL) return true; void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); @@ -503,63 +436,8 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } -#if 0 -bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) { - while (1) { - if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { - return false; - } - if (pHandle->pBlock == NULL) return false; - - void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t)); - if (ret == NULL) { - return true; - } - } - return false; -} - -int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) { - // - int32_t sversion = htonl(pReader->pBlock->sversion); - if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || - pReader->cachedSchemaSuid != pReader->msgIter.suid) { - taosMemoryFree(pReader->pSchema); - pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); - if (pReader->pSchema == NULL) { - tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 - "), version %d, possibly dropped table", - pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; - } - - tDeleteSSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); - if (pReader->pSchemaWrapper == NULL) { - tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; - } - - STSchema* pTschema = pReader->pSchema; - SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; - - int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList); - } - return 0; -} -#endif - int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { - int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); - ASSERT(pReader->nextBlk < blockSz); - - tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); - + tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg2.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; @@ -663,33 +541,27 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD int32_t targetIdx = 0; int32_t sourceIdx = 0; while (targetIdx < colActual) { - ASSERT(sourceIdx < numOfCols); - + if(sourceIdx >= numOfCols){ + tqError("tqRetrieveDataBlock2 sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); + goto FAIL; + } SColData* pCol = taosArrayGet(pCols, sourceIdx); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); SColVal colVal; - ASSERT(pCol->nVal == numOfRows); + if(pCol->nVal != numOfRows){ + tqError("tqRetrieveDataBlock2 pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); + goto FAIL; + } if (pCol->cid < pColData->info.colId) { sourceIdx++; } else if (pCol->cid == pColData->info.colId) { for (int32_t i = 0; i < pCol->nVal; i++) { tColDataGetValue(pCol, i, &colVal); -#if 0 - void* val = NULL; - if (IS_STR_DATA_TYPE(colVal.type)) { - val = colVal.value.pData; - } else { - val = &colVal.value.val; - } - if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; - } -#endif if (IS_STR_DATA_TYPE(colVal.type)) { if (colVal.value.pData != NULL) { - char val[65535 + 2]; + char val[65535 + 2] = {0}; memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { @@ -720,8 +592,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD for (int32_t j = 0; j < colActual; j++) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); while (1) { - ASSERT(sourceIdx < pTschema->numOfCols); - SColVal colVal; tRowGet(pRow, pTschema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { @@ -730,7 +600,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD } else if (colVal.cid == pColData->info.colId) { if (IS_STR_DATA_TYPE(colVal.type)) { if (colVal.value.pData != NULL) { - char val[65535 + 2]; + char val[65535 + 2] = {0}; memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { @@ -739,7 +609,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD } else { colDataSetNULL(pColData, i); } - /*val = colVal.value.pData;*/ } else { if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; @@ -764,253 +633,6 @@ FAIL: return -1; } -#if 0 -int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { - // TODO: cache multiple schema - int32_t sversion = htonl(pReader->pBlock->sversion); - if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || - pReader->cachedSchemaSuid != pReader->msgIter.suid) { - if (pReader->pSchema) taosMemoryFree(pReader->pSchema); - pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); - if (pReader->pSchema == NULL) { - tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", - pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; - } - - if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); - if (pReader->pSchemaWrapper == NULL) { - tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pReader->msgIter.uid, pReader->cachedSchemaVer); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; - } - pReader->cachedSchemaVer = sversion; - pReader->cachedSchemaSuid = pReader->msgIter.suid; - } - - STSchema* pTschema = pReader->pSchema; - SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; - - int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList); - - if (colNumNeed == 0) { - int32_t colMeta = 0; - while (colMeta < pSchemaWrapper->nCols) { - SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta]; - SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); - if (code != TSDB_CODE_SUCCESS) { - goto FAIL; - } - colMeta++; - } - } else { - if (colNumNeed > pSchemaWrapper->nCols) { - colNumNeed = pSchemaWrapper->nCols; - } - - int32_t colMeta = 0; - int32_t colNeed = 0; - while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) { - SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta]; - col_id_t colIdSchema = pColSchema->colId; - col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed); - if (colIdSchema < colIdNeed) { - colMeta++; - } else if (colIdSchema > colIdNeed) { - colNeed++; - } else { - SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); - if (code != TSDB_CODE_SUCCESS) { - goto FAIL; - } - colMeta++; - colNeed++; - } - } - } - - if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } - - int32_t colActual = blockDataGetNumOfCols(pBlock); - - STSRowIter iter = {0}; - tdSTSRowIterInit(&iter, pTschema); - STSRow* row; - int32_t curRow = 0; - - tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); - - pBlock->info.id.uid = pReader->msgIter.uid; - pBlock->info.rows = pReader->msgIter.numOfRows; - pBlock->info.version = pReader->pMsg->version; - pBlock->info.dataLoad = 1; - - while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) { - tdSTSRowIterReset(&iter, row); - // get all wanted col of that block - for (int32_t i = 0; i < colActual; i++) { - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); - SCellVal sVal = {0}; - if (!tdSTSRowIterFetch(&iter, pColData->info.colId, pColData->info.type, &sVal)) { - break; - } - if (colDataSetVal(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) { - goto FAIL; - } - } - curRow++; - } - return 0; - -FAIL: - blockDataFreeRes(pBlock); - return -1; -} - -int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { - int32_t sversion = htonl(pReader->pBlock->sversion); - - if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || - pReader->cachedSchemaSuid != pReader->msgIter.suid) { - if (pReader->pSchema) taosMemoryFree(pReader->pSchema); - pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); - if (pReader->pSchema == NULL) { - tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", - pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; - } - - if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); - if (pReader->pSchemaWrapper == NULL) { - tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pReader->msgIter.uid, pReader->cachedSchemaVer); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; - } - pReader->cachedSchemaVer = sversion; - pReader->cachedSchemaSuid = pReader->msgIter.suid; - } - - STSchema* pTschema = pReader->pSchema; - SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; - - int32_t colAtMost = pSchemaWrapper->nCols; - - int32_t curRow = 0; - int32_t lastRow = 0; - - char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); - if (assigned == NULL) return -1; - - tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); - STSRowIter iter = {0}; - tdSTSRowIterInit(&iter, pTschema); - STSRow* row; - - while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) { - bool buildNew = false; - tdSTSRowIterReset(&iter, row); - - tqDebug("vgId:%d, row of block %d", pReader->pWalReader->pWal->cfg.vgId, curRow); - for (int32_t i = 0; i < colAtMost; i++) { - SCellVal sVal = {0}; - if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) { - break; - } - tqDebug("vgId:%d, %d col, type %d", pReader->pWalReader->pWal->cfg.vgId, i, sVal.valType); - if (curRow == 0) { - assigned[i] = sVal.valType != TD_VTYPE_NONE; - buildNew = true; - } else { - bool currentRowAssigned = sVal.valType != TD_VTYPE_NONE; - if (currentRowAssigned != assigned[i]) { - assigned[i] = currentRowAssigned; - buildNew = true; - } - } - } - - if (buildNew) { - if (taosArrayGetSize(blocks) > 0) { - SSDataBlock* pLastBlock = taosArrayGetLast(blocks); - pLastBlock->info.rows = curRow - lastRow; - lastRow = curRow; - } - SSDataBlock* pBlock = createDataBlock(); - SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (tqMaskBlock(pSW, pBlock, pSchemaWrapper, assigned) < 0) { - blockDataDestroy(pBlock); - goto FAIL; - } - SSDataBlock block = {0}; - assignOneDataBlock(&block, pBlock); - blockDataDestroy(pBlock); - - tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, - (int32_t)taosArrayGetSize(block.pDataBlock)); - - taosArrayPush(blocks, &block); - taosArrayPush(schemas, &pSW); - } - - SSDataBlock* pBlock = taosArrayGetLast(blocks); - pBlock->info.id.uid = pReader->msgIter.uid; - pBlock->info.rows = 0; - pBlock->info.version = pReader->pMsg->version; - - tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, - (int32_t)taosArrayGetSize(blocks)); - - if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } - - tdSTSRowIterReset(&iter, row); - for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); - SCellVal sVal = {0}; - - if (!tdSTSRowIterFetch(&iter, pColData->info.colId, pColData->info.type, &sVal)) { - break; - } - - ASSERT(sVal.valType != TD_VTYPE_NONE); - - if (colDataSetVal(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { - goto FAIL; - } - tqDebug("vgId:%d, row %d col %d append %d", pReader->pWalReader->pWal->cfg.vgId, curRow, i, - sVal.valType == TD_VTYPE_NULL); - } - curRow++; - } - SSDataBlock* pLastBlock = taosArrayGetLast(blocks); - pLastBlock->info.rows = curRow - lastRow; - - taosMemoryFree(assigned); - return 0; - -FAIL: - taosMemoryFree(assigned); - return -1; -} -#endif - int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 55bf8b37d0..27355b5893 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -127,10 +127,10 @@ enum { typedef struct { // TODO remove prepareStatus - STqOffsetVal prepareStatus; // for tmq - STqOffsetVal lastStatus; // for tmq +// STqOffsetVal prepareStatus; // for tmq + STqOffsetVal currentOffset; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta - int8_t returned; +// int8_t returned; int64_t snapshotVer; // const SSubmitReq* pReq; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index af625be0b1..af7fad8a59 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -995,15 +995,9 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { return &pTaskInfo->streamInfo.metaRsp; } -int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) { +void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.prepareStatus.uid; -} - -int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal)); - return 0; + memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal)); } int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) { @@ -1052,21 +1046,19 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; - pTaskInfo->streamInfo.prepareStatus = *pOffset; - pTaskInfo->streamInfo.returned = 0; - if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { return 0; } + pTaskInfo->streamInfo.currentOffset = *pOffset; if (subType == TOPIC_SUB_TYPE__COLUMN) { pOperator->status = OP_OPENED; - // TODO add more check if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if(pOperator->numOfDownstream != 1){ qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream); - return -1; + return TSDB_CODE_TMQ_CONSUMER_ERROR; } pOperator = pOperator->pDownstream[0]; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0dcefec93d..fdfb94a073 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2774,9 +2774,18 @@ void qStreamCloseTsdbReader(void* task) { if (task == NULL) return; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; SOperatorInfo* pOp = pTaskInfo->pRoot; +<<<<<<< Updated upstream qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid, pTaskInfo->streamInfo.lastStatus.ts); pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0}; +======= + + qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid, + pTaskInfo->streamInfo.currentOffset.ts); + + // todo refactor, other thread may already use this read to extract data. + pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0}; +>>>>>>> Stashed changes while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 40b9597643..98386b5496 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -673,9 +673,9 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { // todo refactor /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/ /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/ - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; - pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; +// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; +// pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; +// pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; ASSERT(pBlock->info.id.uid != 0); return pBlock; @@ -853,9 +853,11 @@ static void destroyTableScanOperatorInfo(void* param) { SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + int32_t code = 0; STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } @@ -863,8 +865,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - int32_t code = - extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo); + code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1567,17 +1568,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) { - /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/ - - /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ - /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ - /*void* msgStr = pTaskInfo->streamInfo.*/ SPackedData submit = pTaskInfo->streamInfo.submit; if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { qError("submit msg messed up when initing stream submit block %p", submit.msgStr); - pInfo->tqReader->msg2 = (SPackedData){0}; - pInfo->tqReader->setMsg = 0; - ASSERT(0); + return NULL; } } @@ -1605,13 +1599,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; } - if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); - pTaskInfo->streamInfo.returned = 1; + tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); return pResult; +<<<<<<< Updated upstream } else { if (!pTaskInfo->streamInfo.returned) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; @@ -1626,45 +1621,35 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } else { return NULL; } +======= + } + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer); + qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); + if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { + return NULL; +>>>>>>> Stashed changes } } - if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { + if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { while (1) { SFetchRet ret = {0}; - if (tqNextBlock(pInfo->tqReader, &ret) < 0) { - // if the end is reached, terrno is 0 - if (terrno != 0) { - qError("failed to get next log block since %s", terrstr()); - } - } + tqNextBlock(pInfo->tqReader, &ret); + pTaskInfo->streamInfo.currentOffset = ret.offset; if (ret.fetchType == FETCH_TYPE__DATA) { blockDataCleanup(pInfo->pRes); setBlockIntoRes(pInfo, &ret.data, true); - if (pInfo->pRes->info.rows > 0) { - pOperator->status = OP_EXEC_RECV; - qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); - return pInfo->pRes; - } - } else if (ret.fetchType == FETCH_TYPE__META) { - qError("unexpected ret.fetchType:%d", ret.fetchType); - continue; - // pTaskInfo->streamInfo.lastStatus = ret.offset; - // pTaskInfo->streamInfo.metaBlk = ret.meta; - // return NULL; - } else if (ret.fetchType == FETCH_TYPE__NONE || - (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { - pTaskInfo->streamInfo.lastStatus = ret.offset; - char formatBuf[80]; - tFormatOffset(formatBuf, 80, &ret.offset); - qDebug("queue scan log return null, offset %s", formatBuf); - pOperator->status = OP_OPENED; - return NULL; + qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); + return pInfo->pRes; } + return NULL; } } else { - qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); + qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type); return NULL; } } @@ -2094,7 +2079,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; qDebug("tmqsnap doRawScan called"); - if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(pInfo->dataReader); @@ -2107,28 +2092,22 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; - pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; + tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey); return pBlock; } SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); - pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; - pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion; + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->sContext->snapVersion); } else { - pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; - pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN; + tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, mtInfo.uid, INT64_MIN); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); - qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); } tDeleteSSchemaWrapper(mtInfo.schema); qDebug("tmqsnap stream scan tsdb return null"); return NULL; - } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { + } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) { SSnapContext* sContext = pInfo->sContext; void* data = NULL; int32_t dataLen = 0; @@ -2141,15 +2120,11 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } if (!sContext->queryMetaOrData) { // change to get data next poll request - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; - pTaskInfo->streamInfo.lastStatus.uid = uid; - pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0; - pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN; + tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, 0, INT64_MIN); + pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.currentOffset; } else { - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; - pTaskInfo->streamInfo.lastStatus.uid = uid; - pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus; + tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid); + pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.currentOffset; pTaskInfo->streamInfo.metaRsp.resMsgType = type; pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen; pTaskInfo->streamInfo.metaRsp.metaRsp = data; @@ -2351,7 +2326,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->base.dataReader = NULL; - pTaskInfo->streamInfo.lastStatus.uid = -1; } if (pHandle->initTqReader) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 3e1e36ccc1..ad6127ead2 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -33,7 +33,6 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { pReader->pLogFile = NULL; pReader->curVersion = -1; pReader->curFileFirstVer = -1; - pReader->curInvalid = 1; pReader->capacity = 0; if (cond) { pReader->cond = *cond; @@ -81,7 +80,6 @@ int32_t walNextValidMsg(SWalReader *pReader) { wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64 ", end index:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); - pReader->curStopped = 0; while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; @@ -99,7 +97,6 @@ int32_t walNextValidMsg(SWalReader *pReader) { fetchVer = pReader->curVersion; } } - pReader->curStopped = 1; return -1; } @@ -196,17 +193,16 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return -1; } - wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId, - pReader->curVersion, pReader->curInvalid, ver); + wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, + pReader->curVersion, ver); pReader->curVersion = ver; - pReader->curInvalid = 0; return 0; } int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; - if (!pReader->curInvalid && ver == pReader->curVersion) { + if (ver == pReader->curVersion) { wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); return 0; } @@ -238,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); - if (pRead->curInvalid || pRead->curVersion != fetchVer) { + if (pRead->curVersion != fetchVer) { if (walReadSeekVer(pRead, fetchVer) < 0) { // pRead->curVersion = fetchVer; // pRead->curInvalid = 1; @@ -344,7 +340,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { return -1; } - if (pRead->curInvalid || pRead->curVersion != ver) { + if (pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); if (code < 0) { // pRead->curVersion = ver; @@ -479,7 +475,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { taosThreadMutexLock(&pReader->mutex); - if (pReader->curInvalid || pReader->curVersion != ver) { + if (pReader->curVersion != ver) { if (walReadSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); @@ -575,7 +571,6 @@ void walReadReset(SWalReader *pReader) { taosThreadMutexLock(&pReader->mutex); taosCloseFile(&pReader->pIdxFile); taosCloseFile(&pReader->pLogFile); - pReader->curInvalid = 1; pReader->curFileFirstVer = -1; pReader->curVersion = -1; taosThreadMutexUnlock(&pReader->mutex); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index cfaeca8038..0c3da80bff 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -627,6 +627,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") -- GitLab