提交 9fb4fa0a 编写于 作者: A Alex Duan

feat(api): add int64 affected rows modify tmsg.h struct

上级 693a602c
...@@ -1403,8 +1403,8 @@ typedef struct { ...@@ -1403,8 +1403,8 @@ typedef struct {
int8_t streamBlockType; int8_t streamBlockType;
int32_t compLen; int32_t compLen;
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t numOfRows; int64_t numOfRows; // from int32_t change to int64_t
int32_t numOfCols; int64_t numOfCols;
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t version; // for stream int64_t version; // for stream
......
...@@ -171,7 +171,7 @@ typedef struct SReqResultInfo { ...@@ -171,7 +171,7 @@ typedef struct SReqResultInfo {
char** convertBuf; char** convertBuf;
TAOS_ROW row; TAOS_ROW row;
SResultColumn* pCol; SResultColumn* pCol;
uint64_t numOfRows; uint64_t numOfRows; // from int32_t change to int64_t
uint64_t totalRows; uint64_t totalRows;
uint64_t current; uint64_t current;
bool localResultFetched; bool localResultFetched;
......
...@@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR ...@@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows); pResultInfo->numOfRows = htonll(pRsp->numOfRows);
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen); pResultInfo->payloadLen = htonl(pRsp->compLen);
......
...@@ -684,7 +684,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC ...@@ -684,7 +684,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator); SOperatorInfo* pOperator);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
......
...@@ -115,14 +115,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -115,14 +115,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb, try next %d/%" PRIzu, ", total:%.2f Kb, try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
totalSources); totalSources);
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
} }
...@@ -367,14 +367,14 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -367,14 +367,14 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pSourceDataInfo->pRsp = pMsg->pData; pSourceDataInfo->pRsp = pMsg->pData;
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows); pRsp->numOfRows = htonll(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
ASSERT(pRsp != NULL); ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
pRsp->numOfRows, pExchangeInfo); pRsp->numOfRows, pExchangeInfo);
} else { } else {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
...@@ -472,7 +472,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas ...@@ -472,7 +472,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator) { SOperatorInfo* pOperator) {
pInfo->totalRows += numOfRows; pInfo->totalRows += numOfRows;
pInfo->totalSize += dataLen; pInfo->totalSize += dataLen;
......
...@@ -297,7 +297,7 @@ typedef struct SSchJob { ...@@ -297,7 +297,7 @@ typedef struct SSchJob {
SExecResult execRes; SExecResult execRes;
void *fetchRes; // TODO free it or not void *fetchRes; // TODO free it or not
bool fetched; bool fetched;
int32_t resNumOfRows; int64_t resNumOfRows; // from int32_t to int64_t
SSchResInfo userRes; SSchResInfo userRes;
char *sql; char *sql;
SQueryProfileSummary summary; SQueryProfileSummary summary;
......
...@@ -412,7 +412,7 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) { ...@@ -412,7 +412,7 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
SCH_JOB_DLOG("empty res and set query complete, code:%x", code); SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
} }
SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
_return: _return:
...@@ -528,7 +528,7 @@ void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); ...@@ -528,7 +528,7 @@ void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH);
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_64(&pJob->resNumOfRows, htonll(pRsp->numOfRows));
atomic_store_ptr(&pJob->fetchRes, pRsp); atomic_store_ptr(&pJob->fetchRes, pRsp);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
......
...@@ -108,7 +108,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs ...@@ -108,7 +108,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
} }
atomic_store_ptr(&pJob->fetchRes, rsp); atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); atomic_add_fetch_64(&pJob->resNumOfRows, htonll(rsp->numOfRows));
if (rsp->completed) { if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
...@@ -279,7 +279,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -279,7 +279,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
} }
} }
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks); SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks);
SCH_LOCK(SCH_WRITE, &pJob->resLock); SCH_LOCK(SCH_WRITE, &pJob->resLock);
...@@ -317,7 +317,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -317,7 +317,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
tDecodeSVDeleteRsp(&coder, &rsp); tDecodeSVDeleteRsp(&coder, &rsp);
tDecoderClear(&coder); tDecoderClear(&coder);
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
} }
...@@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp)); SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册