提交 af5b304f 编写于 作者: H hjxilinx

[td-32] fix bugs in returning data to client

上级 e49bd8a7
...@@ -273,8 +273,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -273,8 +273,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
} }
pSql->retry = 0; pSql->retry = 0;
pRes->rspLen = 0; pRes->rspLen = 0;
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL; pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
} else { } else {
...@@ -283,9 +283,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -283,9 +283,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
assert(rpcMsg->msgType == pCmd->msgType + 1); assert(rpcMsg->msgType == pCmd->msgType + 1);
pRes->code = (int32_t)rpcMsg->code; pRes->code = rpcMsg->code;
pRes->rspType = rpcMsg->msgType; pRes->rspType = rpcMsg->msgType;
pRes->rspLen = rpcMsg->contLen; pRes->rspLen = rpcMsg->contLen;
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
if (tmp == NULL) { if (tmp == NULL) {
...@@ -389,7 +389,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -389,7 +389,7 @@ int tscProcessSql(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = NULL; STableMetaInfo *pTableMetaInfo = NULL;
uint16_t type = 0; uint16_t type = 0;
if (pQueryInfo != NULL) { if (pQueryInfo != NULL) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -2302,9 +2302,9 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -2302,9 +2302,9 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision); pRes->precision = htons(pRetrieve->precision);
pRes->offset = htobe64(pRetrieve->offset); pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds); pRes->useconds = htobe64(pRetrieve->useconds);
pRes->data = pRetrieve->data; pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscSetResultPointer(pQueryInfo, pRes); tscSetResultPointer(pQueryInfo, pRes);
......
...@@ -254,7 +254,7 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { ...@@ -254,7 +254,7 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
...@@ -263,40 +263,26 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -263,40 +263,26 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
int32_t contLen = 0; int32_t contLen = 0;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp); contLen = sizeof(SRetrieveTableRsp);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0; memset(pRsp, 0, sizeof(SRetrieveTableRsp));
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
//todo free qinfo
} else { } else {
SRetrieveTableRsp* pRsp = NULL; // todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
int32_t code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
//todo check code
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
} }
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -2698,10 +2698,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2698,10 +2698,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
&pRuntimeEnv->windowResInfo, pDataBlock); &pRuntimeEnv->windowResInfo, pDataBlock);
// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d",
// checked:%d", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.size);
// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId,
// pQueryHandle->cur.slot, pQuery->pos, blockInfo.size, forwardStep);
// save last access position // save last access position
cnt += forwardStep; cnt += forwardStep;
...@@ -6189,12 +6187,13 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro ...@@ -6189,12 +6187,13 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro
} }
sem_wait(&pQInfo->dataReady); sem_wait(&pQInfo->dataReady);
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
*numOfRows = pQInfo->rec.pointsRead; *numOfRows = pQInfo->rec.pointsRead;
*rowsize = pQuery->rowSize; *rowsize = pQuery->rowSize;
dTrace("QInfo:%p, retrieve res info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code);
if (pQInfo->code < 0) { // less than 0 means there are error existed. if (pQInfo->code < 0) { // less than 0 means there are error existed.
return -pQInfo->code; return -pQInfo->code;
......
...@@ -379,6 +379,9 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { ...@@ -379,6 +379,9 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
.window = {.skey = skey, .ekey = ekey} .window = {.skey = skey, .ekey = ekey}
}; };
// update the last key value
pHandle->pTableQueryInfo->lastKey = ekey + 1;
return blockInfo; return blockInfo;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册