diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b19a0d783d2df9e70b9b21d0b5321ec98df35880..1f7323a06ad2134643f4eb2db8d91e132bea8c88 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -153,7 +153,6 @@ struct SWalReader { int64_t capacity; TdThreadMutex mutex; SWalFilterCond cond; - // TODO remove it SWalCkHead *pHead; }; @@ -208,9 +207,9 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset) // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); -int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); -int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); -int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); +int32_t walFetchHead(SWalReader *pRead, int64_t ver); +int32_t walFetchBody(SWalReader *pRead); +int32_t walSkipFetchBody(SWalReader *pRead); void walRefFirstVer(SWal *, SWalRef *); void walRefLastVer(SWal *, SWalRef *); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a6a84075b59081bc245f2b9bac763dd386fc36c7..f08c308185cf2d5dd6c5bea7615f19e58b4074b7 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9b8f1781cb47182156d7834adb1cbb2c5fbf2050..6c091fa4cb179e2cf83d8e4e2d0cdf5e13958f44 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -184,50 +184,42 @@ end: return tbSuid == realTbSuid; } -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) { - int32_t code = 0; +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) { + int32_t code = -1; int32_t vgId = TD_VID(pTq->pVnode); - taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; + int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal); + int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal); + int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal); - while (1) { - if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { + wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64, + vgId, offset, lastVer, committedVer, appliedVer); + + while (offset <= appliedVer) { + if (walFetchHead(pHandle->pWalReader, offset) < 0) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%" PRIx64, pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); - *fetchOffset = offset; - code = -1; goto END; } tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, - pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId); - - if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { - code = walFetchBody(pHandle->pWalReader, ppCkHead); + pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId); - if (code < 0) { - *fetchOffset = offset; - code = -1; - goto END; - } - *fetchOffset = offset; - code = 0; + if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) { + code = walFetchBody(pHandle->pWalReader); goto END; } else { if (pHandle->fetchMeta != WITH_DATA) { - SWalCont* pHead = &((*ppCkHead)->head); + SWalCont* pHead = &(pHandle->pWalReader->pHead->head); if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) { - code = walFetchBody(pHandle->pWalReader, ppCkHead); + code = walFetchBody(pHandle->pWalReader); if (code < 0) { - *fetchOffset = offset; - code = -1; goto END; } if (isValValidForTable(pHandle, pHead)) { - *fetchOffset = offset; code = 0; goto END; } else { @@ -236,10 +228,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea } } } - code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); + code = walSkipFetchBody(pHandle->pWalReader); if (code < 0) { - *fetchOffset = offset; - code = -1; goto END; } offset++; @@ -247,7 +237,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea } END: - taosThreadMutexUnlock(&pHandle->pWalReader->mutex); + *fetchOffset = offset; return code; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5cbca6e0f2dbe7a60231cafd1fc223f9c39eff05..42aac52c63e42523155860ad1deb3d7044b0235c 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -179,7 +179,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SRpcMsg* pMsg, STqOffsetVal* offset) { int code = 0; int32_t vgId = TD_VID(pTq->pVnode); - SWalCkHead* pCkHead = NULL; SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, *offset); @@ -216,12 +215,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { walReaderVerifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version; - pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); - if (pCkHead == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; - goto end; - } walSetReaderCapacity(pHandle->pWalReader, 2048); int totalRows = 0; @@ -234,14 +227,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, break; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); // setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } - SWalCont* pHead = &pCkHead->head; + SWalCont* pHead = &pHandle->pWalReader->pHead->head; tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); @@ -291,7 +284,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, end: tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); return code; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 54b9576eb1dae585b4bf2cdbe8bc026b9e050322..01404494e34604c78752271eed39749aae036b95 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -16,10 +16,6 @@ #include "taoserror.h" #include "walInt.h" -static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer); -static int32_t walFetchBodyNew(SWalReader *pRead); -static int32_t walSkipFetchBodyNew(SWalReader *pRead); - SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader)); if (pReader == NULL) { @@ -80,19 +76,19 @@ int32_t walNextValidMsg(SWalReader *pReader) { return -1; } while (fetchVer <= appliedVer) { - if (walFetchHeadNew(pReader, fetchVer) < 0) { + if (walFetchHead(pReader, fetchVer) < 0) { return -1; } int32_t type = pReader->pHead->head.msgType; if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { - if (walFetchBodyNew(pReader) < 0) { + if (walFetchBody(pReader) < 0) { return -1; } return 0; } else { - if (walSkipFetchBodyNew(pReader) < 0) { + if (walSkipFetchBody(pReader) < 0) { return -1; } @@ -256,102 +252,7 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; } -static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { - int64_t contLen; - bool seeked = false; - - wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); - - if (pRead->curVersion != fetchVer) { - if (walReaderSeekVer(pRead, fetchVer) < 0) { - return -1; - } - seeked = true; - } - - while (1) { - contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); - if (contLen == sizeof(SWalCkHead)) { - break; - } else if (contLen == 0 && !seeked) { - if(walReadSeekVerImpl(pRead, fetchVer) < 0){ - return -1; - } - seeked = true; - continue; - } else { - if (contLen < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - } else { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - } - return -1; - } - } -// pRead->curInvalid = 0; - return 0; -} - -static int32_t walFetchBodyNew(SWalReader *pReader) { - SWalCont *pReadHead = &pReader->pHead->head; - int64_t ver = pReadHead->version; - - wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver, - pReadHead->bodyLen); - - if (pReader->capacity < pReadHead->bodyLen) { - SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); - if (ptr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pReader->pHead = ptr; - pReadHead = &pReader->pHead->head; - pReader->capacity = pReadHead->bodyLen; - } - - if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) { - if (pReadHead->bodyLen < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", - pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno)); - } else { - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", - pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - } - return -1; - } - - if (walValidBodyCksum(pReader->pHead) != 0) { - wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; - } - - wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType); - pReader->curVersion = ver + 1; - return 0; -} - -static int32_t walSkipFetchBodyNew(SWalReader *pRead) { - int64_t code; - - code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); - if (code < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); -// pRead->curInvalid = 1; - return -1; - } - - pRead->curVersion++; - wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion); - - return 0; -} - -int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { +int32_t walFetchHead(SWalReader *pRead, int64_t ver) { int64_t code; int64_t contLen; bool seeked = false; @@ -369,15 +270,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { if (pRead->curVersion != ver) { code = walReaderSeekVer(pRead, ver); if (code < 0) { -// pRead->curVersion = ver; -// pRead->curInvalid = 1; return -1; } seeked = true; } while (1) { - contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); + contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { @@ -392,12 +291,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } -// pRead->curInvalid = 1; return -1; } } - code = walValidHeadCksum(pHead); + code = walValidHeadCksum(pRead->pHead); if (code != 0) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); @@ -405,32 +303,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { return -1; } -// pRead->curInvalid = 0; return 0; } -int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { - int64_t code; - +int32_t walSkipFetchBody(SWalReader *pRead) { wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 ", applied ver:%" PRId64, - pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, + pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); - code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); + int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); -// pRead->curInvalid = 1; return -1; } pRead->curVersion++; - return 0; } -int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { - SWalCont *pReadHead = &((*ppHead)->head); +int32_t walFetchBody(SWalReader *pRead) { + SWalCont *pReadHead = &pRead->pHead->head; int64_t ver = pReadHead->version; wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 @@ -439,13 +332,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { pRead->pWal->vers.appliedVer); if (pRead->capacity < pReadHead->bodyLen) { - SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); + SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - *ppHead = ptr; - pReadHead = &((*ppHead)->head); + pRead->pHead = ptr; + pReadHead = &pRead->pHead->head; pRead->capacity = pReadHead->bodyLen; } @@ -459,27 +352,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { pRead->pWal->cfg.vgId, pReadHead->version, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } -// pRead->curInvalid = 1; return -1; } if (pReadHead->version != ver) { wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pReadHead->version, ver); -// pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } - if (walValidBodyCksum(*ppHead) != 0) { + if (walValidBodyCksum(pRead->pHead) != 0) { wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); -// pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } - pRead->curVersion = ver + 1; + pRead->curVersion++; return 0; }