未验证 提交 51760f7a 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7668 from taosdata/fix/query

Fix/query
...@@ -3011,6 +3011,8 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool ...@@ -3011,6 +3011,8 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate); return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
} }
} }
tscDebug("0x%"PRIx64 " %s retrieve tableMeta from cache, numOfCols:%d, numOfTags:%d", pSql->self, name, pMeta->tableInfo.numOfColumns, pMeta->tableInfo.numOfTags);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -231,6 +231,12 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); ...@@ -231,6 +231,12 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
if (pOperator->pRuntimeEnv != NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
}
}
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
...@@ -5493,8 +5499,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5493,8 +5499,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
// start to flush data into disk and try do multiway merge sort // start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); doSetOperatorCompleted(pOperator);
pOperator->status = OP_EXEC_DONE;
break; break;
} }
...@@ -5605,8 +5610,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -5605,8 +5610,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
} }
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
...@@ -5682,7 +5686,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { ...@@ -5682,7 +5686,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
} }
return pInfo->pRes; return pInfo->pRes;
...@@ -5800,8 +5804,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5800,8 +5804,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); doSetOperatorCompleted(pOperator);
pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
...@@ -5829,8 +5832,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5829,8 +5832,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total); pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total);
pInfo->total = pInfo->limit; pInfo->total = pInfo->limit;
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); doSetOperatorCompleted(pOperator);
pOperator->status = OP_EXEC_DONE;
} else { } else {
pInfo->total += pBlock->info.rows; pInfo->total += pBlock->info.rows;
} }
...@@ -5865,8 +5867,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { ...@@ -5865,8 +5867,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
} }
} }
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); doSetOperatorCompleted(pOperator);
pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
...@@ -5881,9 +5882,8 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -5881,9 +5882,8 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
...@@ -5924,7 +5924,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -5924,7 +5924,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
...@@ -5943,7 +5943,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -5943,7 +5943,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
...@@ -6002,7 +6002,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6002,7 +6002,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
...@@ -7250,13 +7250,11 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -7250,13 +7250,11 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); doSetOperatorCompleted(pOperator);
pOperator->status = OP_EXEC_DONE;
break; break;
} }
if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) { if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); doSetOperatorCompleted(pOperator);
pOperator->status = OP_EXEC_DONE;
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册