From 9fb4fa0a7f8443848b0171158d994fe164e10fa8 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 20:48:39 +0800 Subject: [PATCH] feat(api): add int64 affected rows modify tmsg.h struct --- include/common/tmsg.h | 4 ++-- source/client/inc/clientInt.h | 2 +- source/client/src/clientImpl.c | 2 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/exchangeoperator.c | 10 +++++----- source/libs/scheduler/inc/schInt.h | 2 +- source/libs/scheduler/src/schJob.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 8 ++++---- 8 files changed, 17 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0a082a37e4..9ad2e4375b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1403,8 +1403,8 @@ typedef struct { int8_t streamBlockType; int32_t compLen; int32_t numOfBlocks; - int32_t numOfRows; - int32_t numOfCols; + int64_t numOfRows; // from int32_t change to int64_t + int64_t numOfCols; int64_t skey; int64_t ekey; int64_t version; // for stream diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 6c2a7d75a0..ea76f726ea 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -171,7 +171,7 @@ typedef struct SReqResultInfo { char** convertBuf; TAOS_ROW row; SResultColumn* pCol; - uint64_t numOfRows; + uint64_t numOfRows; // from int32_t change to int64_t uint64_t totalRows; uint64_t current; bool localResultFetched; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 77699e89c2..6444150263 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pData = (void*)pRsp->data; - pResultInfo->numOfRows = htonl(pRsp->numOfRows); + pResultInfo->numOfRows = htonll(pRsp->numOfRows); pResultInfo->current = 0; pResultInfo->completed = (pRsp->completed == 1); pResultInfo->payloadLen = htonl(pRsp->compLen); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7100be58e3..60f8171bac 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -684,7 +684,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); -void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, +void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 963a273290..03e9e411bb 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -115,14 +115,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } @@ -367,14 +367,14 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pSourceDataInfo->pRsp = pMsg->pData; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; - pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->numOfRows = htonll(pRsp->numOfRows); pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->useconds = htobe64(pRsp->useconds); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); @@ -472,7 +472,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return TSDB_CODE_SUCCESS; } -void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, +void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator) { pInfo->totalRows += numOfRows; pInfo->totalSize += dataLen; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index dfc48e7d9f..77aafa9a27 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -297,7 +297,7 @@ typedef struct SSchJob { SExecResult execRes; void *fetchRes; // TODO free it or not bool fetched; - int32_t resNumOfRows; + int64_t resNumOfRows; // from int32_t to int64_t SSchResInfo userRes; char *sql; SQueryProfileSummary summary; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 0eb29a3667..37a3cc9286 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -412,7 +412,7 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) { SCH_JOB_DLOG("empty res and set query complete, code:%x", code); } - SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); + SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows); _return: @@ -528,7 +528,7 @@ void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); - atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); + atomic_store_64(&pJob->resNumOfRows, htonll(pRsp->numOfRows)); atomic_store_ptr(&pJob->fetchRes, pRsp); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1a6d7df349..101fdfe052 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -108,7 +108,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs } atomic_store_ptr(&pJob->fetchRes, rsp); - atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); + atomic_add_fetch_64(&pJob->resNumOfRows, htonll(rsp->numOfRows)); if (rsp->completed) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); @@ -279,7 +279,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa } } - atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); + atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks); SCH_LOCK(SCH_WRITE, &pJob->resLock); @@ -317,7 +317,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa tDecodeSVDeleteRsp(&coder, &rsp); tDecoderClear(&coder); - atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); + atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); } @@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp)); - atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); + atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); taosMemoryFreeClear(msg); -- GitLab