diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index de920068778610199e52b2dddac119e103c33376..04ff27fa333ab4a892f8aeebf3bb64f570ff99a6 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -260,6 +260,7 @@ typedef struct SQueryRuntimeEnv { int64_t currentOffset; // dynamic offset value SRspResultInfo resultInfo; + SHashObj *pTableRetrieveTsMap; } SQueryRuntimeEnv; enum { @@ -307,17 +308,11 @@ enum { typedef struct SQInfo { void* signature; - int32_t code; // error code to returned to client - int64_t owner; // if it is in execution + int32_t code; // error code to returned to client + int64_t owner; // if it is in execution SQueryRuntimeEnv runtimeEnv; SQuery query; - SHashObj* arrTableIdInfo; - - /* - * the query is executed position on which meter of the whole list. - * when the index reaches the last one of the list, it means the query is completed. - */ void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; pthread_mutex_t lock; // used to synchronize the rsp/query threads diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d2298fa6dc8caeb3f68d7fe96c9e145b34123e1c..eef1515fcd982a7aacfedc2bac7fdb20413c26d1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -157,7 +157,7 @@ static bool hasMainOutput(SQuery *pQuery); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); -static STableIdInfo createTableIdInfo(SQuery* pQuery); +static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); @@ -1619,6 +1619,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->tagVal = malloc(pQuery->tagLen); pRuntimeEnv->currentOffset = pQuery->limit.offset; + // NOTE: pTableCheckInfo need to update the query time range and the lastKey info + pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); if (pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || @@ -1755,6 +1758,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); pRuntimeEnv->prevResult = NULL; + taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap); + pRuntimeEnv->pTableRetrieveTsMap = NULL; + destroyOperatorInfo(pRuntimeEnv->proot); } @@ -3476,12 +3482,12 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } - int32_t numOfTables = (int32_t) taosHashGetSize(pQInfo->arrTableIdInfo); + int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); int32_t total = 0; - STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL); + STableIdInfo* item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, NULL); while(item) { STableIdInfo* pDst = (STableIdInfo*)data; @@ -3493,7 +3499,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data total++; qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key); - item = taosHashIterate(pQInfo->arrTableIdInfo, item); + item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, item); } qDebug("QInfo:%p set %d subscribe info", pQInfo, total); @@ -4006,22 +4012,20 @@ STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) { return cond; } -static STableIdInfo createTableIdInfo(SQuery* pQuery) { - assert(pQuery != NULL && pQuery->current != NULL); - +static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo) { STableIdInfo tidInfo; - STableId* id = TSDB_TABLEID(pQuery->current->pTable); + STableId* id = TSDB_TABLEID(pTableQueryInfo->pTable); tidInfo.uid = id->uid; tidInfo.tid = id->tid; - tidInfo.key = pQuery->current->lastKey; + tidInfo.key = pTableQueryInfo->lastKey; return tidInfo; } -static UNUSED_FUNC void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) { - STableIdInfo tidInfo = createTableIdInfo(pQuery); - STableIdInfo* idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); +static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SHashObj* pTableIdInfo) { + STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo); + STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); if (idinfo != NULL) { assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid); idinfo->key = tidInfo.key; @@ -4465,6 +4469,8 @@ static SSDataBlock* doArithmeticOperation(void* param) { arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); + updateTableIdInfo(pRuntimeEnv->pQuery->current, pRuntimeEnv->pTableRetrieveTsMap); + if (pInfo->pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { break; } @@ -6038,8 +6044,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr goto _cleanup; } - // NOTE: pTableCheckInfo need to update the query time range and the lastKey info - pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->rspContext = NULL; pQInfo->sql = sql; @@ -6293,10 +6297,8 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo->sql); tsdbDestroyTableGroup(&pQuery->tableGroupInfo); - taosHashCleanup(pQInfo->arrTableIdInfo); taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); - pQInfo->signature = 0; qDebug("QInfo:%p QInfo is freed", pQInfo); diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 248f33f0d110196b922158eb66ef9d2fceb87457..67fe344284020fdd743ad01b34ffb9628b9962f6 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -308,7 +308,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv); size_t size = pQuery->resultRowSize * s; size += sizeof(int32_t); - size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo); + size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); *contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));