From 271822ebc4aac95813988e33cf09ae9c322601b5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 8 Dec 2022 01:55:47 +0800 Subject: [PATCH] refactor: support submitreq2 --- source/dnode/vnode/src/tq/tqRead.c | 33 ++++++++++++++++----- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 -- source/libs/wal/src/walRead.c | 41 ++++++++++++++------------- 3 files changed, 47 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d0f938563b..fdd9f5f284 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -389,7 +389,7 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, pReader->msg2.ver = ver; pReader->ver = ver; - tqDebug("tq reader set msg %p", msgStr); + tqDebug("tq reader set msg %p %d", msgStr, msgLen); if (pReader->setMsg == 0) { SDecoder decoder; @@ -683,6 +683,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { } else if (pCol->cid == pColData->info.colId) { for (int32_t i = 0; i < pCol->nVal; i++) { tColDataGetValue(pCol, sourceIdx, &colVal); +#if 0 void* val = NULL; if (IS_STR_DATA_TYPE(colVal.type)) { val = colVal.value.pData; @@ -692,6 +693,20 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; } +#endif + char val[65535 + 2]; + if (IS_STR_DATA_TYPE(colVal.type)) { + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + /*val = colVal.value.pData;*/ + } else { + if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } } sourceIdx++; targetIdx++; @@ -718,14 +733,18 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) { - void* val = NULL; + char val[65535 + 2]; if (IS_STR_DATA_TYPE(colVal.type)) { - val = colVal.value.pData; + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + /*val = colVal.value.pData;*/ } else { - val = &colVal.value.val; - } - if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } } sourceIdx++; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index dde5a06c9d..f8ef39b516 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -881,8 +881,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = TSDB_CODE_SUCCESS; - vDebug("vvvvvvvvvvvv %p, %d", pReq, len); - // decode SDecoder dc = {0}; tDecoderInit(&dc, pReq, len); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index ed02d29e3b..4c3fb3c0ba 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -271,57 +271,58 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { return 0; } -static int32_t walFetchBodyNew(SWalReader *pRead) { - SWalCont *pReadHead = &pRead->pHead->head; +static int32_t walFetchBodyNew(SWalReader *pReader) { + SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wDebug("vgId:%d, wal starts to fetch body, index:%" PRId64, pRead->pWal->cfg.vgId, ver); + wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d", pReader->pWal->cfg.vgId, ver, + pReadHead->bodyLen); - if (pRead->capacity < pReadHead->bodyLen) { - SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); + if (pReader->capacity < pReadHead->bodyLen) { + SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pRead->pHead = ptr; - pReadHead = &pRead->pHead->head; - pRead->capacity = pReadHead->bodyLen; + pReader->pHead = ptr; + pReadHead = &pReader->pHead->head; + pReader->capacity = pReadHead->bodyLen; } - if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { + if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", - pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver, tstrerror(terrno)); + pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno)); } else { wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", - pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver); + pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - pRead->curInvalid = 1; + pReader->curInvalid = 1; ASSERT(0); return -1; } if (pReadHead->version != ver) { - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pRead->pHead->head.version, ver); - pRead->curInvalid = 1; + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId, + pReader->pHead->head.version, ver); + pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); return -1; } - if (walValidBodyCksum(pRead->pHead) != 0) { - wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); - pRead->curInvalid = 1; + if (walValidBodyCksum(pReader->pHead) != 0) { + wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver); + pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); return -1; } - wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pRead->pWal->cfg.vgId, ver); - pRead->curVersion = ver + 1; + wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver); + pReader->curVersion = ver + 1; return 0; } -- GitLab