提交 c0e31d6c 编写于 作者: H hjxilinx

[td-32] fix bugs in projection query

上级 f7434783
...@@ -226,47 +226,59 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { ...@@ -226,47 +226,59 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
} }
static void dnodeContinueExecuteQuery(void* qhandle, SReadMsg *pMsg) {
SReadMsg readMsg = {
.rpcMsg = {.msgType = TSDB_MSG_TYPE_QUERY},
.pCont = qhandle,
.contLen = 0,
.pRpcContext = pMsg->pRpcContext,
.pVnode = pMsg->pVnode,
};
taos_queue queue = dnodeGetVnodeRworker(pMsg->pVnode);
taosWriteQitem(queue, &readMsg);
}
static void dnodeProcessQueryMsg(SReadMsg *pMsg) { static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL; SQInfo* pQInfo = NULL;
void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); if (pMsg->rpcMsg.contLen != 0) {
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, pMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp); SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
} else {
pQInfo = pMsg->pCont;
}
// do execute query // do execute query
qTableQuery(pQInfo); qTableQuery(pQInfo);
} }
static int32_t c = 0;
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
if ((++c)%2 == 0) {
int32_t k = 1;
}
int32_t rowSize = 0;
int32_t numOfRows = 0;
int32_t contLen = 0; int32_t contLen = 0;
SRetrieveTableRsp *pRsp = NULL; SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp); contLen = sizeof(SRetrieveTableRsp);
...@@ -275,6 +287,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -275,6 +287,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
} else { } else {
// todo check code and handle error in build result set // todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
if (qNeedFurtherExec(pQInfo)) {
dnodeContinueExecuteQuery(pQInfo, pMsg);
}
} }
SRpcMsg rpcRsp = (SRpcMsg) { SRpcMsg rpcRsp = (SRpcMsg) {
......
...@@ -68,8 +68,10 @@ typedef struct SWindowResult { ...@@ -68,8 +68,10 @@ typedef struct SWindowResult {
} SWindowResult; } SWindowResult;
typedef struct SResultRec { typedef struct SResultRec {
int64_t pointsTotal; int64_t total;
int64_t pointsRead; int64_t size;
int64_t capacity;
int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client
} SResultRec; } SResultRec;
typedef struct SWindowResInfo { typedef struct SWindowResInfo {
...@@ -112,7 +114,7 @@ typedef struct STableQueryInfo { ...@@ -112,7 +114,7 @@ typedef struct STableQueryInfo {
typedef struct STableDataInfo { typedef struct STableDataInfo {
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t start; // start block index int32_t start; // start block index
int32_t tableIndex; int32_t tableIndex;
void* pMeterObj; void* pMeterObj;
int32_t groupIdx; // group id in table list int32_t groupIdx; // group id in table list
...@@ -143,7 +145,6 @@ typedef struct SQuery { ...@@ -143,7 +145,6 @@ typedef struct SQuery {
int32_t pos; int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data int64_t pointsOffset; // the number of points offset to save read data
SData** sdata; SData** sdata;
int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
...@@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv { ...@@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
void* pVnode; void* param; // pointer to the RpcReadMsg
TSKEY startTime; TSKEY startTime;
TSKEY elapsedTime; TSKEY elapsedTime;
SResultRec rec;
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
// int32_t killed; // denotes if current query is killed
sem_t dataReady; sem_t dataReady;
SArray* pTableIdList; // table list SArray* pTableIdList; // table id list
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx; int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */ int32_t offset; /* offset in group result set of subgroup */
...@@ -204,7 +203,7 @@ typedef struct SQInfo { ...@@ -204,7 +203,7 @@ typedef struct SQInfo {
* @param pQInfo * @param pQInfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo);
/** /**
* query on single table * query on single table
...@@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg); ...@@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg);
* wait for the query completed, and retrieve final results to client * wait for the query completed, and retrieve final results to client
* @param pQInfo * @param pQInfo
*/ */
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo);
/** /**
* *
...@@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro ...@@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro
*/ */
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen); int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
/**
*
* @param pQInfo
* @return
*/
bool qNeedFurtherExec(SQInfo* pQInfo);
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -364,8 +364,8 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio ...@@ -364,8 +364,8 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio
bool doRevisedResultsByLimit(SQInfo *pQInfo) { bool doRevisedResultsByLimit(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if ((pQuery->limit.limit > 0) && (pQuery->rec.pointsRead + pQInfo->rec.pointsRead > pQuery->limit.limit)) { if ((pQuery->limit.limit > 0) && (pQuery->rec.size + pQuery->rec.size > pQuery->limit.limit)) {
pQuery->rec.pointsRead = pQuery->limit.limit - pQInfo->rec.pointsRead; pQuery->rec.size = pQuery->limit.limit - pQuery->rec.size;
// query completed // query completed
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
...@@ -1344,17 +1344,16 @@ static int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forward ...@@ -1344,17 +1344,16 @@ static int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forward
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
SDataStatis *pStatis, __block_search_fn_t searchFn, int32_t *numOfRes, SDataStatis *pStatis, __block_search_fn_t searchFn, int32_t *numOfRes,
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
*numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); *numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
} else { } else {
*numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
} }
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pQuery->lastKey = lastKey + step; pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
...@@ -1368,12 +1367,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl ...@@ -1368,12 +1367,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
assert(*numOfRes >= 0); assert(*numOfRes >= 0);
// check if buffer is large enough for accommodating all qualified points // check if buffer is large enough for accommodating all qualified points
if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1) { if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1 && ((*numOfRes) >= pQuery->rec.threshold)) {
pQuery->pointsOffset -= *numOfRes; setQueryStatus(pQuery, QUERY_RESBUF_FULL);
if (pQuery->pointsOffset <= 0) { // todo return correct numOfRes for ts_comp function
pQuery->pointsOffset = 0;
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
} }
return 0; return 0;
...@@ -2302,7 +2297,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -2302,7 +2297,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->status = 0; pQuery->status = 0;
pQInfo->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0};
pQuery->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0};
changeExecuteScanOrder(pQuery, true); changeExecuteScanOrder(pQuery, true);
...@@ -2668,9 +2663,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2668,9 +2663,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
int32_t numOfRes = 0; int32_t numOfRes = 0;
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
&pRuntimeEnv->windowResInfo, pDataBlock); &pRuntimeEnv->windowResInfo, pDataBlock);
...@@ -3035,9 +3030,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -3035,9 +3030,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
offset += pData->numOfElems; offset += pData->numOfElems;
} }
assert(pQuery->rec.pointsRead == 0); assert(pQuery->rec.size == 0);
pQuery->rec.pointsRead += rows; pQuery->rec.size += rows;
pQInfo->offset += 1; pQInfo->offset += 1;
} }
...@@ -3367,7 +3362,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3367,7 +3362,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
} }
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity); memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity);
} }
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
...@@ -3414,14 +3409,14 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3414,14 +3409,14 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->rec.pointsRead == 0 || pQuery->limit.offset == 0) { if (pQuery->rec.size == 0 || pQuery->limit.offset == 0) {
return; return;
} }
if (pQuery->rec.pointsRead <= pQuery->limit.offset) { if (pQuery->rec.size <= pQuery->limit.offset) {
pQuery->limit.offset -= pQuery->rec.pointsRead; pQuery->limit.offset -= pQuery->rec.size;
pQuery->rec.pointsRead = 0; pQuery->rec.size = 0;
// pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer // pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
...@@ -3430,13 +3425,13 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3430,13 +3425,13 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->status &= (~QUERY_RESBUF_FULL); pQuery->status &= (~QUERY_RESBUF_FULL);
} else { } else {
int32_t numOfSkip = (int32_t)pQuery->limit.offset; int32_t numOfSkip = (int32_t)pQuery->limit.offset;
pQuery->rec.pointsRead -= numOfSkip; pQuery->rec.size -= numOfSkip;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
assert(0); assert(0);
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes); // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->size * bytes);
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
...@@ -3999,8 +3994,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { ...@@ -3999,8 +3994,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC; int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC;
int32_t numOfResult = doCopyToSData(pQInfo, result, orderType); int32_t numOfResult = doCopyToSData(pQInfo, result, orderType);
pQuery->rec.pointsRead += numOfResult; pQuery->rec.size += numOfResult;
// assert(pQuery->rec.pointsRead <= pQuery->pointsToRead);
assert(pQuery->rec.size <= pQuery->rec.capacity);
} }
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) { static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) {
...@@ -4038,31 +4034,6 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo ...@@ -4038,31 +4034,6 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo
updatelastkey(pQuery, pTableQueryInfo); updatelastkey(pQuery, pTableQueryInfo);
} }
// we need to split the refstatsult into different packages.
int32_t vnodeGetResultSize(void *thandle, int32_t *numOfRows) {
SQInfo *pQInfo = (SQInfo *)thandle;
SQuery *pQuery = &pQInfo->runtimeEnv.pQuery;
/*
* get the file size and set the numOfRows to be the file size, since for tsComp query,
* the returned row size is equalled to 1
*
* TODO handle the case that the file is too large to send back one time
*/
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) {
struct stat fstat;
if (stat(pQuery->sdata[0]->data, &fstat) == 0) {
*numOfRows = fstat.st_size;
return fstat.st_size;
} else {
dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno));
return 0;
}
} else {
return pQuery->rowSize * (*numOfRows);
}
}
bool vnodeHasRemainResults(void *handle) { bool vnodeHasRemainResults(void *handle) {
SQInfo *pQInfo = (SQInfo *)handle; SQInfo *pQInfo = (SQInfo *)handle;
...@@ -4074,7 +4045,7 @@ bool vnodeHasRemainResults(void *handle) { ...@@ -4074,7 +4045,7 @@ bool vnodeHasRemainResults(void *handle) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo; SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo;
if (pQuery->limit.limit > 0 && pQInfo->rec.pointsRead >= pQuery->limit.limit) { if (pQuery->limit.limit > 0 && pQuery->rec.size >= pQuery->limit.limit) {
return false; return false;
} }
...@@ -4147,6 +4118,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4147,6 +4118,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
data += bytes * numOfRows; data += bytes * numOfRows;
} }
// all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
setQueryStatus(pQuery, QUERY_OVER);
}
} }
int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows, int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows,
...@@ -4255,8 +4231,6 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { ...@@ -4255,8 +4231,6 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) {
setScanLimitationByResultBuffer(pQuery); setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQuery, false); changeExecuteScanOrder(pQuery, false);
pQInfo->rec = (SResultRec){0};
// dataInCache requires lastKey value // dataInCache requires lastKey value
pQuery->lastKey = pQuery->window.skey; pQuery->lastKey = pQuery->window.skey;
...@@ -4535,7 +4509,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start ...@@ -4535,7 +4509,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
// accumulate the point interpolation result // accumulate the point interpolation result
if (numOfRes > 0) { if (numOfRes > 0) {
pQuery->rec.pointsRead += numOfRes; pQuery->rec.size += numOfRes;
forwardCtxOutputBuf(pRuntimeEnv, numOfRes); forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
} }
...@@ -4623,7 +4597,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4623,7 +4597,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
pSupporter->subgroupIdx++; pSupporter->subgroupIdx++;
// output buffer is full, return to client // output buffer is full, return to client
if (pQuery->pointsRead >= pQuery->pointsToRead) { if (pQuery->size >= pQuery->pointsToRead) {
break; break;
} }
} }
...@@ -4639,9 +4613,9 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4639,9 +4613,9 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
*/ */
if (pSupporter->subgroupIdx > 0) { if (pSupporter->subgroupIdx > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->size += pQuery->size;
if (pQuery->pointsRead > 0) { if (pQuery->size > 0) {
return; return;
} }
} }
...@@ -4707,7 +4681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4707,7 +4681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
vnodeScanAllData(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv);
pQuery->pointsRead = getNumOfResult(pRuntimeEnv); pQuery->size = getNumOfResult(pRuntimeEnv);
doSkipResults(pRuntimeEnv); doSkipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed // the limitation of output result is reached, set the query completed
...@@ -4742,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4742,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
pQuery->skey = pQuery->lastKey; pQuery->skey = pQuery->lastKey;
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pQuery->pointsRead == 0) { if (pQuery->size == 0) {
assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
continue; continue;
} else { } else {
...@@ -4789,17 +4763,17 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4789,17 +4763,17 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
} }
pQInfo->pTableQuerySupporter->subgroupIdx = 0; pQInfo->pTableQuerySupporter->subgroupIdx = 0;
pQuery->pointsRead = 0; pQuery->size = 0;
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
} }
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->size += pQuery->size;
pQuery->pointsOffset = pQuery->pointsToRead; pQuery->pointsOffset = pQuery->pointsToRead;
dTrace( dTrace(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64, "next skey:%" PRId64 ", offset:%" PRId64,
pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size,
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
#endif #endif
} }
...@@ -4911,13 +4885,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { ...@@ -4911,13 +4885,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
} }
pQInfo->rec.pointsRead += pQuery->rec.pointsRead; pQuery->rec.size += pQuery->rec.size;
if (pQuery->rec.pointsRead == 0) { if (pQuery->rec.size == 0) {
// vnodePrintQueryStatistics(pSupporter); // vnodePrintQueryStatistics(pSupporter);
} }
dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.pointsRead, pQInfo->rec.pointsTotal); dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.size, pQuery->rec.total);
return; return;
} }
#if 0 #if 0
...@@ -4970,8 +4944,8 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { ...@@ -4970,8 +4944,8 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
} }
// handle the limitation of output buffer // handle the limitation of output buffer
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->size += pQuery->size;
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->size, pQInfo->size,
pQInfo->pointsReturned); pQInfo->pointsReturned);
#endif #endif
} }
...@@ -4994,8 +4968,8 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { ...@@ -4994,8 +4968,8 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
} }
// since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously.
pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); pQuery->rec.size = getNumOfResult(pRuntimeEnv);
// assert(pQuery->pointsRead <= pQuery->pointsToRead && // assert(pQuery->size <= pQuery->pointsToRead &&
// Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)); // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED));
// must be top/bottom query if offset > 0 // must be top/bottom query if offset > 0
...@@ -5006,7 +4980,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { ...@@ -5006,7 +4980,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
doSkipResults(pRuntimeEnv); doSkipResults(pRuntimeEnv);
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
pQInfo->rec.pointsRead = pQuery->rec.pointsRead; pQuery->rec.size = pQuery->rec.size;
} }
static void tableMultiOutputProcessor(SQInfo *pQInfo) { static void tableMultiOutputProcessor(SQInfo *pQInfo) {
...@@ -5026,16 +5000,16 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { ...@@ -5026,16 +5000,16 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) {
return; return;
} }
pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); pQuery->rec.size = getNumOfResult(pRuntimeEnv);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.pointsRead > 0) { if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.size > 0) {
doSkipResults(pRuntimeEnv); doSkipResults(pRuntimeEnv);
} }
/* /*
* 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data * 1. if pQuery->size == 0, pQuery->limit.offset >= 0, still need to check data
* 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0 * 2. if pQuery->size > 0, pQuery->limit.offset must be 0
*/ */
if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
break; break;
} }
...@@ -5046,23 +5020,21 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { ...@@ -5046,23 +5020,21 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) {
} }
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, dTrace("QInfo:%p query paused due to buffer limitation, next qrange:%" PRId64 "-%" PRId64,
// pQInfo, pQuery->lastKey, pQuery->ekey); pQInfo, pQuery->lastKey, pQuery->window.ekey);
} }
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode,
// pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); // pMeterObj->sid, pMeterObj->meterId, pQuery->size, pQInfo->size, pQInfo->pointsReturned);
// pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer // pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer
// if (!isTSCompQuery(pQuery)) { // if (!isTSCompQuery(pQuery)) {
// assert(pQuery->pointsRead <= pQuery->pointsToRead); // assert(pQuery->size <= pQuery->pointsToRead);
// } // }
} }
static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
while (1) { while (1) {
...@@ -5088,13 +5060,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -5088,13 +5060,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->limit.offset -= c; pQuery->limit.offset -= c;
} }
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED|QUERY_RESBUF_FULL)) {
break;
}
// load the data block for the next retrieve
// loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
break; break;
} }
} }
...@@ -5108,12 +5074,11 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { ...@@ -5108,12 +5074,11 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
int32_t numOfInterpo = 0; int32_t numOfInterpo = 0;
while (1) { while (1) {
resetCtxOutputBuf(pRuntimeEnv); tableIntervalProcessImpl(pRuntimeEnv);
vnodeSingleMeterIntervalMainLooper(pRuntimeEnv);
if (pQuery->intervalTime > 0) { if (pQuery->intervalTime > 0) {
pQInfo->subgroupIdx = 0; // always start from 0 pQInfo->subgroupIdx = 0; // always start from 0
pQuery->rec.pointsRead = 0; pQuery->rec.size = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
...@@ -5124,43 +5089,43 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { ...@@ -5124,43 +5089,43 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
break; break;
} else { } else {
taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.pointsRead, pQuery->interpoType); taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.size, pQuery->interpoType);
SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.pointsRead * pQuery->pSelectExpr[i].resBytes); memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.size * pQuery->pSelectExpr[i].resBytes);
} }
numOfInterpo = 0; numOfInterpo = 0;
pQuery->rec.pointsRead = vnodeQueryResultInterpolate( pQuery->rec.size = vnodeQueryResultInterpolate(
pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo); pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.size, &numOfInterpo);
dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead); dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.size);
if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
break; break;
} }
// no result generated yet, continue retrieve data // no result generated yet, continue retrieve data
pQuery->rec.pointsRead = 0; pQuery->rec.size = 0;
} }
} }
// all data scanned, the group by normal column can return // all data scanned, the group by normal column can return
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result
pQInfo->subgroupIdx = 0; pQInfo->subgroupIdx = 0;
pQuery->rec.pointsRead = 0; pQuery->rec.size = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
} }
pQInfo->rec.pointsRead += pQuery->rec.pointsRead; pQuery->rec.size += pQuery->rec.size;
pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsInterpo += numOfInterpo;
// dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d // dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d
// totalReturn:%d", // totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, numOfInterpo,
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); // pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
} }
void qTableQuery(SQInfo *pQInfo) { void qTableQuery(SQInfo *pQInfo) {
...@@ -5187,16 +5152,16 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -5187,16 +5152,16 @@ void qTableQuery(SQInfo *pQInfo) {
int32_t numOfInterpo = 0; int32_t numOfInterpo = 0;
int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, pQuery->rec.size = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata,
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsInterpo += numOfInterpo;
pQInfo->rec.pointsRead += pQuery->rec.pointsRead; pQuery->rec.size += pQuery->rec.size;
// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", // dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, // pQInfo, pQuery->size, numOfInterpo, pQInfo->size, pQInfo->pointsInterpo,
// pQInfo->pointsReturned); // pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
return; return;
...@@ -5206,22 +5171,22 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -5206,22 +5171,22 @@ void qTableQuery(SQInfo *pQInfo) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
// continue to get push data from the group result // continue to get push data from the group result
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
(pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { (pQuery->intervalTime > 0 && pQuery->rec.total < pQuery->limit.limit)) {
// todo limit the output for interval query? // todo limit the output for interval query?
pQuery->rec.pointsRead = 0; pQuery->rec.size = 0;
pQInfo->subgroupIdx = 0; // always start from 0 pQInfo->subgroupIdx = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) { if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->rec.pointsRead += pQuery->rec.pointsRead; pQuery->rec.size += pQuery->rec.size;
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
if (pQuery->rec.pointsRead > 0) { if (pQuery->rec.size > 0) {
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d
// totalReturn:%d", // totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size,
// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); // pQInfo->size, pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
return; return;
...@@ -5231,7 +5196,7 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -5231,7 +5196,7 @@ void qTableQuery(SQInfo *pQInfo) {
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode,
// pMeterObj->sid, // pMeterObj->sid,
// pMeterObj->meterId, pQInfo->pointsRead); // pMeterObj->meterId, pQInfo->size);
// vnodePrintQueryStatistics(pSupporter); // vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
...@@ -5239,7 +5204,7 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -5239,7 +5204,7 @@ void qTableQuery(SQInfo *pQInfo) {
} }
// number of points returned during this query // number of points returned during this query
pQuery->rec.pointsRead = 0; pQuery->rec.size = 0;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -5265,7 +5230,7 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -5265,7 +5230,7 @@ void qTableQuery(SQInfo *pQInfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
} else { } else {
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead); dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size);
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
...@@ -5288,7 +5253,7 @@ void qSuperTableQuery(void *pReadMsg) { ...@@ -5288,7 +5253,7 @@ void qSuperTableQuery(void *pReadMsg) {
// assert(pQInfo->refCount >= 1); // assert(pQInfo->refCount >= 1);
#if 0 #if 0
SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; SQuery *pQuery = &pQInfo->runtimeEnv.pQuery;
pQuery->rec.pointsRead = 0; pQuery->rec.size = 0;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (pQuery->intervalTime > 0 || if (pQuery->intervalTime > 0 ||
...@@ -5306,13 +5271,13 @@ void qSuperTableQuery(void *pReadMsg) { ...@@ -5306,13 +5271,13 @@ void qSuperTableQuery(void *pReadMsg) {
pQInfo->elapsedTime += (taosGetTimestampUs() - st); pQInfo->elapsedTime += (taosGetTimestampUs() - st);
pQuery->status = isQueryKilled(pQInfo) ? 1 : 0; pQuery->status = isQueryKilled(pQInfo) ? 1 : 0;
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->pointsRead, // taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size,
// pQInfo->query.interpoType); // pQInfo->query.interpoType);
if (pQuery->rec.pointsRead == 0) { if (pQuery->rec.size == 0) {
// pQInfo->over = 1; // pQInfo->over = 1;
// dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, // dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters,
// pQInfo->pointsRead); // pQInfo->size);
// vnodePrintQueryStatistics(pSupporter); // vnodePrintQueryStatistics(pSupporter);
} }
...@@ -5916,12 +5881,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5916,12 +5881,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
} }
// set the output buffer capacity // set the output buffer capacity
pQuery->capacity = 4096; pQuery->rec.capacity = 4096;
pQuery->rec.threshold = 2;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
assert(pExprs[col].interResBytes >= pExprs[col].resBytes); assert(pExprs[col].interResBytes >= pExprs[col].resBytes);
// allocate additional memory for interResults that are usually larger then final results // allocate additional memory for interResults that are usually larger then final results
size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
pQuery->sdata[col] = (SData *)calloc(1, size); pQuery->sdata[col] = (SData *)calloc(1, size);
if (pQuery->sdata[col] == NULL) { if (pQuery->sdata[col] == NULL) {
goto _clean_memory; goto _clean_memory;
...@@ -5943,9 +5910,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5943,9 +5910,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->pTableIdList = pTableIdList; pQInfo->pTableIdList = pTableIdList;
pQuery->pos = -1; pQuery->pos = -1;
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
// pQInfo); dTrace("QInfo %p is allocated", pQInfo);
return pQInfo; return pQInfo;
_clean_memory: _clean_memory:
...@@ -6098,7 +6064,7 @@ _error: ...@@ -6098,7 +6064,7 @@ _error:
return code; return code;
} }
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param, SQInfo **pQInfo) {
assert(pQueryTableMsg != NULL); assert(pQueryTableMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -6136,6 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ ...@@ -6136,6 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
} else { } else {
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
(*pQInfo)->param = param;
} }
_query_over: _query_over:
...@@ -6161,7 +6128,7 @@ _query_over: ...@@ -6161,7 +6128,7 @@ _query_over:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *rowsize) { int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
if (pQInfo == NULL || !isQInfoValid(pQInfo)) { if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE; return TSDB_CODE_INVALID_QHANDLE;
} }
...@@ -6177,11 +6144,8 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro ...@@ -6177,11 +6144,8 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro
} }
sem_wait(&pQInfo->dataReady); sem_wait(&pQInfo->dataReady);
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
*numOfRows = pQInfo->rec.pointsRead; pQInfo->code);
*rowsize = pQuery->rowSize;
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code);
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
} }
...@@ -6208,7 +6172,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { ...@@ -6208,7 +6172,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
} }
} }
static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// the remained number of retrieved rows, not the interpolated result // the remained number of retrieved rows, not the interpolated result
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
...@@ -6231,28 +6195,31 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { ...@@ -6231,28 +6195,31 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) {
pQuery->sdata[0]->data, strerror(errno)); pQuery->sdata[0]->data, strerror(errno));
} }
} else { } else {
doCopyQueryResultToMsg(pQInfo, pQInfo->rec.pointsRead, data); doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data);
} }
pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; pQuery->rec.total += pQuery->rec.size;
dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total);
setQueryStatus(pQuery, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
// todo if interpolation exists, the result may be dump to client by several rounds // todo if interpolation exists, the result may be dump to client by several rounds
} }
static void addToTaskQueue(SQInfo* pQInfo) { bool qNeedFurtherExec(SQInfo* pQInfo) {
// no error occurred, continue retrieving data if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) {
if (pQInfo->code == TSDB_CODE_SUCCESS) { return false;
#ifdef _TD_ARM_ }
dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:doDumpQueryResult", pQInfo, pQInfo->signature);
#else SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
#endif return false;
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
// todo add to task queue return true;
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
return true;
} else {
assert(0);
} }
} }
...@@ -6262,13 +6229,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c ...@@ -6262,13 +6229,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
} }
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQInfo->rec.pointsRead); size_t size = getResultSize(pQInfo, &pQuery->rec.size);
*contLen = size + sizeof(SRetrieveTableRsp); *contLen = size + sizeof(SRetrieveTableRsp);
// todo handle failed to allocate memory // todo handle failed to allocate memory
*pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen); *pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen);
(*pRsp)->numOfRows = htonl(pQuery->rec.size);
(*pRsp)->numOfRows = htonl(pQInfo->rec.pointsRead);
int32_t code = pQInfo->code; int32_t code = pQInfo->code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
...@@ -6279,16 +6245,13 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c ...@@ -6279,16 +6245,13 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
(*pRsp)->useconds = 0; (*pRsp)->useconds = 0;
} }
if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); code = doDumpQueryResult(pQInfo, (*pRsp)->data);
// has more data to return or need next round to execute
addToTaskQueue(pQInfo);
} else { } else {
code = pQInfo->code; code = pQInfo->code;
} }
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
(*pRsp)->completed = 1; // notify no more result to client (*pRsp)->completed = 1; // notify no more result to client
vnodeFreeQInfo(pQInfo); vnodeFreeQInfo(pQInfo);
} }
......
...@@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle { ...@@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle {
int32_t tableIndex; int32_t tableIndex;
bool isFirstSlot; bool isFirstSlot;
void * qinfo; // query info handle, for debug purpose void * qinfo; // query info handle, for debug purpose
SSkipListIterator* memIter;
} STsdbQueryHandle; } STsdbQueryHandle;
int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) { int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) {
...@@ -367,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { ...@@ -367,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
int32_t rows = 0; int32_t rows = 0;
if (pTable->mem != NULL) { if (pTable->mem != NULL) {
SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData);
rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle); // create mem table iterator if it is not created yet
if (pHandle->memIter == NULL) {
pHandle->memIter = tSkipListCreateIter(pTable->mem->pData);
}
rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle);
} }
SDataBlockInfo blockInfo = { SDataBlockInfo blockInfo = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册