提交 b5554878 编写于 作者: H Haojun Liao

[TD-2293]<enhance>: improve projection query performance for super table.

上级 43a3f513
...@@ -5311,9 +5311,10 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5311,9 +5311,10 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/* /*
* the limitation/offset value should be removed during retrieve data from virtual node, * the offset value should be removed during retrieve data from virtual node, since the
* since the global order are done in client side, so the limitation should also * global order are done in client side, so the offset is applied at the client side
* be done at the client side. * However, note that the maximum allowed number of result for each table should be less
* than or equal to the value of limit.
*/ */
if (pQueryInfo->limit.limit > 0) { if (pQueryInfo->limit.limit > 0) {
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
......
...@@ -190,6 +190,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -190,6 +190,7 @@ typedef struct SQueryRuntimeEnv {
bool groupbyNormalCol; // denote if this is a groupby normal column query bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
...@@ -217,7 +218,8 @@ typedef struct SQInfo { ...@@ -217,7 +218,8 @@ typedef struct SQInfo {
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
SArray* arrTableIdInfo; // SArray* arrTableIdInfo;
SHashObj* arrTableIdInfo;
int32_t groupIndex; int32_t groupIndex;
/* /*
......
...@@ -197,6 +197,8 @@ static int32_t checkForQueryBuf(size_t numOfTables); ...@@ -197,6 +197,8 @@ static int32_t checkForQueryBuf(size_t numOfTables);
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type); static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type);
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery);
static STableIdInfo createTableIdInfo(SQuery* pQuery);
bool doFilterData(SQuery *pQuery, int32_t elemPos) { bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
...@@ -2781,6 +2783,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa ...@@ -2781,6 +2783,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
pQuery->rec.capacity = capacity; pQuery->rec.capacity = capacity;
} }
// TODO merge with enuserOutputBufferSimple
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -3916,13 +3919,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI ...@@ -3916,13 +3919,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
SET_REVERSE_SCAN_FLAG(pRuntimeEnv); SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
STsdbQueryCond cond = { STsdbQueryCond cond = createTsdbQueryCond(pQuery);
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv); switchCtxOrder(pRuntimeEnv);
...@@ -4005,18 +4002,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { ...@@ -4005,18 +4002,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
break; break;
} }
STsdbQueryCond cond = {
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, qstatus.curWindow);
if (pRuntimeEnv->pSecQueryHandle != NULL) { if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
} }
STsdbQueryCond cond = createTsdbQueryCond(pQuery);
restoreTimeWindow(&pQInfo->tableGroupInfo, &cond); restoreTimeWindow(&pQInfo->tableGroupInfo, &cond);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) { if (pRuntimeEnv->pSecQueryHandle == NULL) {
...@@ -4541,16 +4531,19 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4541,16 +4531,19 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo); int32_t numOfTables = taosHashGetSize(pQInfo->arrTableIdInfo);
*(int32_t*)data = htonl(numOfTables); *(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t); data += sizeof(int32_t);
for(int32_t i = 0; i < numOfTables; i++) {
STableIdInfo* pSrc = taosArrayGet(pQInfo->arrTableIdInfo, i); STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL);
while(item) {
STableIdInfo* pDst = (STableIdInfo*)data; STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(pSrc->uid); pDst->uid = htobe64(item->uid);
pDst->tid = htonl(pSrc->tid); pDst->tid = htonl(item->tid);
pDst->key = htobe64(pSrc->key); pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo); data += sizeof(STableIdInfo);
item = taosHashIterate(pQInfo->arrTableIdInfo, item);
} }
// Check if query is completed or not for stable query or normal table query respectively. // Check if query is completed or not for stable query or normal table query respectively.
...@@ -4877,13 +4870,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) ...@@ -4877,13 +4870,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STsdbQueryCond cond = { STsdbQueryCond cond = createTsdbQueryCond(pQuery);
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
if (!isSTableQuery if (!isSTableQuery
&& (pQInfo->tableqinfoGroupInfo.numOfTables == 1) && (pQInfo->tableqinfoGroupInfo.numOfTables == 1)
...@@ -5276,6 +5263,41 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -5276,6 +5263,41 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
return true; return true;
} }
STsdbQueryCond createTsdbQueryCond(SQuery* pQuery) {
STsdbQueryCond cond = {
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
return cond;
}
static STableIdInfo createTableIdInfo(SQuery* pQuery) {
assert(pQuery != NULL && pQuery->current != NULL);
STableIdInfo tidInfo;
STableId* id = TSDB_TABLEID(pQuery->current->pTable);
tidInfo.uid = id->uid;
tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
return tidInfo;
}
static void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) {
STableIdInfo tidInfo = createTableIdInfo(pQuery);
STableIdInfo* idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
if (idinfo != NULL) {
assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid);
idinfo->key = tidInfo.key;
} else {
taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
}
}
/** /**
* super table query handler * super table query handler
* 1. super table projection query, group-by on normal columns query, ts-comp query * 1. super table projection query, group-by on normal columns query, ts-comp query
...@@ -5295,18 +5317,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5295,18 +5317,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
qDebug("QInfo:%p last_row query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, pQInfo->groupIndex,
numOfGroups, group);
STsdbQueryCond cond = {
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window); qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo,
pQInfo->groupIndex, numOfGroups, group);
STsdbQueryCond cond = createTsdbQueryCond(pQuery);
SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group); SArray *tx = taosArrayClone(group);
...@@ -5330,14 +5345,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5330,14 +5345,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
assert(taosArrayGetSize(s) >= 1); assert(taosArrayGetSize(s) >= 1);
setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb);
taosArrayDestroy(s); taosArrayDestroy(s);
// here we simply set the first table as current table // here we simply set the first table as current table
SArray* first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); SArray *first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
pQuery->current = taosArrayGetP(first, 0); pQuery->current = taosArrayGetP(first, 0);
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
...@@ -5361,17 +5376,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5361,17 +5376,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
} else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex, numOfGroups); qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex,
numOfGroups);
STsdbQueryCond cond = { STsdbQueryCond cond = createTsdbQueryCond(pQuery);
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group); SArray *tx = taosArrayClone(group);
...@@ -5394,7 +5404,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5394,7 +5404,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
longjmp(pRuntimeEnv->env, terrno); longjmp(pRuntimeEnv->env, terrno);
} }
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
assert(taosArrayGetSize(s) >= 1); assert(taosArrayGetSize(s) >= 1);
setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb);
...@@ -5416,7 +5426,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5416,7 +5426,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j); SResultRowCellInfo *pCell = getResultCell(pRuntimeEnv, pResult, j);
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes));
} }
} }
...@@ -5431,16 +5441,119 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5431,16 +5441,119 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size); ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size);
copyFromWindowResToSData(pQInfo, pWindowResInfo); copyFromWindowResToSData(pQInfo, pWindowResInfo);
pQInfo->groupIndex = currentGroupIndex; //restore the group index pQInfo->groupIndex = currentGroupIndex; // restore the group index
assert(pQuery->rec.rows == pWindowResInfo->size); assert(pQuery->rec.rows == pWindowResInfo->size);
clearClosedTimeWindow(pRuntimeEnv); clearClosedTimeWindow(pRuntimeEnv);
break; break;
} }
} else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL) {
//super table projection query with identical query time range for all tables.
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
resetDefaultResInfoOutputBuf(pRuntimeEnv);
SArray *group = GET_TABLEGROUP(pQInfo, 0);
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
void *pQueryHandle = pRuntimeEnv->pQueryHandle;
if (pQueryHandle == NULL) {
STsdbQueryCond con = createTsdbQueryCond(pQuery);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pQueryHandle = pRuntimeEnv->pQueryHandle;
}
// skip blocks without load the actual data block from file if no filter condition present
// skipBlocks(&pQInfo->runtimeEnv);
// if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
bool hasMoreBlock = true;
SQueryCostInfo *summary = &pRuntimeEnv->summary;
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo =
(STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if (pTableQueryInfo == NULL) {
break;
}
pQuery->current = *pTableQueryInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
assert(((*pTableQueryInfo)->win.skey <= (*pTableQueryInfo)->win.ekey) &&
((*pTableQueryInfo)->lastKey >= (*pTableQueryInfo)->win.skey) &&
((*pTableQueryInfo)->win.skey >= pQuery->window.skey &&
(*pTableQueryInfo)->win.ekey <= pQuery->window.ekey));
} else {
assert(((*pTableQueryInfo)->win.skey >= (*pTableQueryInfo)->win.ekey) &&
((*pTableQueryInfo)->lastKey <= (*pTableQueryInfo)->win.skey) &&
((*pTableQueryInfo)->win.skey <= pQuery->window.skey &&
(*pTableQueryInfo)->win.ekey >= pQuery->window.ekey));
}
if (pRuntimeEnv->hasTagResults) {
setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb);
}
uint32_t status = 0;
SDataStatis *pStatis = NULL;
SArray * pDataBlock = NULL;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo,
&pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
assert(status != BLK_DATA_DISCARD);
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
summary->totalRows += blockInfo.rows;
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64,
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes,
pQuery->current->lastKey);
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
// the flag may be set by tableApplyFunctionsOnBlock, clear it here
CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED);
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed
if (limitResults(pRuntimeEnv)) {
SET_STABLE_QUERY_OVER(pQInfo);
break;
}
// while the output buffer is full or limit/offset is applied, query may be paused here
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL|QUERY_COMPLETED)) {
break;
}
}
if (!hasMoreBlock) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
}
} else { } else {
/* /*
* 1. super table projection query, 2. ts-comp query * the following two cases handled here.
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * 1. ts-comp query, and 2. the super table projection query with different query time range for each table.
* If the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place. * we need to return it to client in the first place.
*/ */
if (pQInfo->groupIndex > 0) { if (pQInfo->groupIndex > 0) {
...@@ -5503,14 +5616,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5503,14 +5616,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* to ensure that, we can reset the query range once query on a meter is completed. * to ensure that, we can reset the query range once query on a meter is completed.
*/ */
pQInfo->tableIndex++; pQInfo->tableIndex++;
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
STableIdInfo tidInfo = {0};
STableId* id = TSDB_TABLEID(pQuery->current->pTable);
tidInfo.uid = id->uid;
tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
// if the buffer is full or group by each table, we need to jump out of the loop // if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
...@@ -5537,7 +5643,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5537,7 +5643,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
} }
}
/* /*
* 1. super table projection query, group-by on normal columns query, ts-comp query * 1. super table projection query, group-by on normal columns query, ts-comp query
...@@ -5558,10 +5663,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5558,10 +5663,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur;
} }
qDebug( qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64
"QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64, " points returned, total:%" PRId64 ", offset:%" PRId64,
pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows,
pQuery->limit.offset); pQuery->rec.total, pQuery->limit.offset);
}
} }
static void doSaveContext(SQInfo *pQInfo) { static void doSaveContext(SQInfo *pQInfo) {
...@@ -5576,13 +5682,7 @@ static void doSaveContext(SQInfo *pQInfo) { ...@@ -5576,13 +5682,7 @@ static void doSaveContext(SQInfo *pQInfo) {
SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order);
} }
STsdbQueryCond cond = { STsdbQueryCond cond = createTsdbQueryCond(pQuery);
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
// clean unused handle // clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) { if (pRuntimeEnv->pSecQueryHandle != NULL) {
...@@ -5855,13 +5955,8 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5855,13 +5955,8 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo; STableIdInfo tidInfo = createTableIdInfo(pQuery);
STableId* id = TSDB_TABLEID(pQuery->current->pTable); taosHashPut(pQInfo->arrTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
tidInfo.uid = id->uid;
tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
} }
if (!isTSCompQuery(pQuery)) { if (!isTSCompQuery(pQuery)) {
...@@ -6859,8 +6954,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6859,8 +6954,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
} }
int tableIndex = 0;
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery); pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo)); pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo));
...@@ -6882,7 +6975,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6882,7 +6975,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
} }
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info // NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pQInfo->rspContext = NULL; pQInfo->rspContext = NULL;
pthread_mutex_init(&pQInfo->lock, NULL); pthread_mutex_init(&pQInfo->lock, NULL);
...@@ -6892,10 +6985,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6892,10 +6985,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
pQInfo->runtimeEnv.queryWindowIdentical = true;
STimeWindow window = pQuery->window; STimeWindow window = pQuery->window;
int32_t index = 0; int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
...@@ -6910,9 +7003,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6910,9 +7003,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
for(int32_t j = 0; j < s; ++j) { for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j); STableKeyInfo* info = taosArrayGet(pa, j);
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
window.skey = info->lastKey; window.skey = info->lastKey;
if (info->lastKey != pQuery->window.skey) {
pQInfo->runtimeEnv.queryWindowIdentical = false;
}
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf); STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf);
if (item == NULL) { if (item == NULL) {
goto _cleanup; goto _cleanup;
...@@ -7126,7 +7222,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -7126,7 +7222,7 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo->pBuf); tfree(pQInfo->pBuf);
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
taosArrayDestroy(pQInfo->arrTableIdInfo); taosHashCleanup(pQInfo->arrTableIdInfo);
pQInfo->signature = 0; pQInfo->signature = 0;
...@@ -7506,7 +7602,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -7506,7 +7602,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
size_t size = getResultSize(pQInfo, &pQuery->rec.rows); size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
size += sizeof(int32_t); size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo); size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo);
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp)); *contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
......
...@@ -83,8 +83,6 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR ...@@ -83,8 +83,6 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
return; return;
} }
// assert(pWindowResInfo->size == 1);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pWindowRes = pWindowResInfo->pResult[i]; SResultRow *pWindowRes = pWindowResInfo->pResult[i];
clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type); clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type);
......
...@@ -151,8 +151,9 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); ...@@ -151,8 +151,9 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
*/ */
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
if (capacity == 0 || fn == NULL) { assert(fn != NULL);
return NULL; if (capacity == 0) {
capacity = 4;
} }
SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj)); SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册