提交 40cafb49 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 d57b3f38
...@@ -260,6 +260,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -260,6 +260,7 @@ typedef struct SQueryRuntimeEnv {
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum { enum {
...@@ -307,17 +308,11 @@ enum { ...@@ -307,17 +308,11 @@ enum {
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
SQuery query; SQuery query;
SHashObj* arrTableIdInfo;
/*
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
*/
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
......
...@@ -157,7 +157,7 @@ static bool hasMainOutput(SQuery *pQuery); ...@@ -157,7 +157,7 @@ static bool hasMainOutput(SQuery *pQuery);
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
static STableIdInfo createTableIdInfo(SQuery* pQuery); static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
...@@ -1619,6 +1619,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1619,6 +1619,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->tagVal = malloc(pQuery->tagLen); pRuntimeEnv->tagVal = malloc(pQuery->tagLen);
pRuntimeEnv->currentOffset = pQuery->limit.offset; pRuntimeEnv->currentOffset = pQuery->limit.offset;
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || if (pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL ||
...@@ -1755,6 +1758,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1755,6 +1758,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL; pRuntimeEnv->prevResult = NULL;
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL;
destroyOperatorInfo(pRuntimeEnv->proot); destroyOperatorInfo(pRuntimeEnv->proot);
} }
...@@ -3476,12 +3482,12 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3476,12 +3482,12 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
int32_t numOfTables = (int32_t) taosHashGetSize(pQInfo->arrTableIdInfo); int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables); *(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t); data += sizeof(int32_t);
int32_t total = 0; int32_t total = 0;
STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL); STableIdInfo* item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, NULL);
while(item) { while(item) {
STableIdInfo* pDst = (STableIdInfo*)data; STableIdInfo* pDst = (STableIdInfo*)data;
...@@ -3493,7 +3499,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3493,7 +3499,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
total++; total++;
qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key); qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key);
item = taosHashIterate(pQInfo->arrTableIdInfo, item); item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, item);
} }
qDebug("QInfo:%p set %d subscribe info", pQInfo, total); qDebug("QInfo:%p set %d subscribe info", pQInfo, total);
...@@ -4006,22 +4012,20 @@ STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) { ...@@ -4006,22 +4012,20 @@ STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) {
return cond; return cond;
} }
static STableIdInfo createTableIdInfo(SQuery* pQuery) { static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo) {
assert(pQuery != NULL && pQuery->current != NULL);
STableIdInfo tidInfo; STableIdInfo tidInfo;
STableId* id = TSDB_TABLEID(pQuery->current->pTable); STableId* id = TSDB_TABLEID(pTableQueryInfo->pTable);
tidInfo.uid = id->uid; tidInfo.uid = id->uid;
tidInfo.tid = id->tid; tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey; tidInfo.key = pTableQueryInfo->lastKey;
return tidInfo; return tidInfo;
} }
static UNUSED_FUNC void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) { static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SHashObj* pTableIdInfo) {
STableIdInfo tidInfo = createTableIdInfo(pQuery); STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo);
STableIdInfo* idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
if (idinfo != NULL) { if (idinfo != NULL) {
assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid); assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid);
idinfo->key = tidInfo.key; idinfo->key = tidInfo.key;
...@@ -4465,6 +4469,8 @@ static SSDataBlock* doArithmeticOperation(void* param) { ...@@ -4465,6 +4469,8 @@ static SSDataBlock* doArithmeticOperation(void* param) {
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
updateTableIdInfo(pRuntimeEnv->pQuery->current, pRuntimeEnv->pTableRetrieveTsMap);
if (pInfo->pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pInfo->pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break; break;
} }
...@@ -6038,8 +6044,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6038,8 +6044,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
goto _cleanup; goto _cleanup;
} }
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pQInfo->rspContext = NULL; pQInfo->rspContext = NULL;
pQInfo->sql = sql; pQInfo->sql = sql;
...@@ -6293,10 +6297,8 @@ void freeQInfo(SQInfo *pQInfo) { ...@@ -6293,10 +6297,8 @@ void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo->sql); tfree(pQInfo->sql);
tsdbDestroyTableGroup(&pQuery->tableGroupInfo); tsdbDestroyTableGroup(&pQuery->tableGroupInfo);
taosHashCleanup(pQInfo->arrTableIdInfo);
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0; pQInfo->signature = 0;
qDebug("QInfo:%p QInfo is freed", pQInfo); qDebug("QInfo:%p QInfo is freed", pQInfo);
......
...@@ -308,7 +308,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -308,7 +308,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv); int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
size_t size = pQuery->resultRowSize * s; size_t size = pQuery->resultRowSize * s;
size += sizeof(int32_t); size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo); size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp)); *contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册