From af5b304f19d560c4e0f585ca74bd6c72c629ae99 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 24 Mar 2020 11:22:39 +0800 Subject: [PATCH] [td-32] fix bugs in returning data to client --- src/client/src/tscServer.c | 14 +++++------ src/dnode/src/dnodeRead.c | 44 ++++++++++++----------------------- src/query/src/queryExecutor.c | 11 ++++----- src/vnode/tsdb/src/tsdbRead.c | 3 +++ 4 files changed, 30 insertions(+), 42 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3da374f152..605c7a22cc 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -273,8 +273,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { } pSql->retry = 0; - pRes->rspLen = 0; + if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL; } else { @@ -283,9 +283,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { assert(rpcMsg->msgType == pCmd->msgType + 1); - pRes->code = (int32_t)rpcMsg->code; + pRes->code = rpcMsg->code; pRes->rspType = rpcMsg->msgType; - pRes->rspLen = rpcMsg->contLen; + pRes->rspLen = rpcMsg->contLen; char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); if (tmp == NULL) { @@ -389,7 +389,7 @@ int tscProcessSql(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = NULL; - uint16_t type = 0; + uint16_t type = 0; if (pQueryInfo != NULL) { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -2302,9 +2302,9 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->precision = htons(pRetrieve->precision); - pRes->offset = htobe64(pRetrieve->offset); - pRes->useconds = htobe64(pRetrieve->useconds); - pRes->data = pRetrieve->data; + pRes->offset = htobe64(pRetrieve->offset); + pRes->useconds = htobe64(pRetrieve->useconds); + pRes->data = pRetrieve->data; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscSetResultPointer(pQueryInfo, pRes); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index bfe717c367..9e50804583 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -254,7 +254,7 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; - void *pQInfo = htobe64(pRetrieve->qhandle); + void *pQInfo = (void*) htobe64(pRetrieve->qhandle); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); @@ -263,40 +263,26 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { int32_t contLen = 0; SRpcMsg rpcRsp = {0}; + SRetrieveTableRsp *pRsp = NULL; int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); if (code != TSDB_CODE_SUCCESS) { contLen = sizeof(SRetrieveTableRsp); - - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); - pRsp->numOfRows = 0; - pRsp->precision = 0; - pRsp->offset = 0; - pRsp->useconds = 0; - - rpcRsp = (SRpcMsg) { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = contLen, - .code = code, - .msgType = 0 - }; - - //todo free qinfo + + pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); } else { - SRetrieveTableRsp* pRsp = NULL; - - int32_t code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); - //todo check code - - rpcRsp = (SRpcMsg) { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = contLen, - .code = code, - .msgType = 0 - }; + // todo check code and handle error in build result set + code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); } + rpcRsp = (SRpcMsg) { + .handle = pMsg->rpcMsg.handle, + .pCont = pRsp, + .contLen = contLen, + .code = code, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); } diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index af7d6f9ab0..696d8c37c9 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2698,10 +2698,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, &pRuntimeEnv->windowResInfo, pDataBlock); - // dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, - // checked:%d", - // GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, - // pQueryHandle->cur.slot, pQuery->pos, blockInfo.size, forwardStep); + dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d", + GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.size); // save last access position cnt += forwardStep; @@ -6189,12 +6187,13 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro } sem_wait(&pQInfo->dataReady); - + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + *numOfRows = pQInfo->rec.pointsRead; *rowsize = pQuery->rowSize; - dTrace("QInfo:%p, retrieve res info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); + dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); if (pQInfo->code < 0) { // less than 0 means there are error existed. return -pQInfo->code; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 1c4f09f4c9..31cdd70f36 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -379,6 +379,9 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { .window = {.skey = skey, .ekey = ekey} }; + // update the last key value + pHandle->pTableQueryInfo->lastKey = ekey + 1; + return blockInfo; } -- GitLab