From 2f7032681411f33d32f8a7552a69f48673a9b566 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Sep 2021 11:33:08 +0800 Subject: [PATCH] [td-6563]optimize the query performance. --- src/client/src/tscServer.c | 6 ++ src/query/inc/qExecutor.h | 8 ++ src/query/src/qExecutor.c | 6 +- src/query/src/qUtil.c | 172 +++++++++++-------------------------- src/tsdb/inc/tsdbMeta.h | 2 +- src/tsdb/src/tsdbRead.c | 6 +- src/util/src/hash.c | 8 +- src/util/src/tlosertree.c | 5 +- 8 files changed, 77 insertions(+), 136 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 747027c217..e781fc0727 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1106,6 +1106,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tableCols[i].bytes = htons(pCol->bytes); pQueryMsg->tableCols[i].type = htons(pCol->type); pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters); + pQueryMsg->tableCols[i].flist.filterInfo = 0; // append the filter information after the basic column information serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg); @@ -1219,6 +1220,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); + } else { + pQueryMsg->tsBuf.tsLen = 0; + pQueryMsg->tsBuf.tsNumOfBlocks = 0; } int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator); @@ -1256,6 +1260,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pUdfInfo->contLen; } + } else { + pQueryMsg->udfContentOffset = 0; } memcpy(pMsg, pSql->sqlstr, sqlLen); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 7aab8dbca7..17e027b6f6 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -84,11 +84,18 @@ typedef struct SResultRow { char *key; // start key of current result row } SResultRow; +typedef struct SResultRowCell { + uint64_t groupId; + SResultRow *pRow; +} SResultRowCell; + typedef struct SGroupResInfo { int32_t totalGroup; int32_t currentGroup; int32_t index; SArray* pRows; // SArray + bool ordered; + int32_t position; } SGroupResInfo; /** @@ -280,6 +287,7 @@ typedef struct SQueryRuntimeEnv { SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SResultRowPool* pool; // window result object pool char** prevRow; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index a330e30a74..b76acfd983 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -546,6 +546,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult // add a new result set for a new group taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES); + SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult}; + taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell); } else { pResult = *p1; } @@ -2109,7 +2111,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell)); pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); @@ -2385,6 +2388,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { destroyOperatorInfo(pRuntimeEnv->proot); pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); + taosArrayDestroy(pRuntimeEnv->pResultRowArrayList); taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); pRuntimeEnv->prevResult = NULL; } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 4caf351799..a26697fb31 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -416,158 +416,83 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow return 0; } -static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { - int32_t left = *(int32_t *)pLeft; - int32_t right = *(int32_t *)pRight; +int32_t tsAscOrder(const void* p1, const void* p2) { + SResultRowCell* pc1 = (SResultRowCell*) p1; + SResultRowCell* pc2 = (SResultRowCell*) p2; - SCompSupporter * supporter = (SCompSupporter *)param; - - int32_t leftPos = supporter->rowIndex[left]; - int32_t rightPos = supporter->rowIndex[right]; - - /* left source is exhausted */ - if (leftPos == -1) { - return 1; - } - - /* right source is exhausted*/ - if (rightPos == -1) { - return -1; + if (pc1->groupId == pc2->groupId) { + if (pc1->pRow->win.skey == pc2->pRow->win.skey) { + return 0; + } else { + return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1; + } + } else { + return (pc1->groupId < pc2->groupId)? -1:1; } +} - STableQueryInfo** pList = supporter->pTableQueryInfo; - - SResultRowInfo *pWindowResInfo1 = &(pList[left]->resInfo); - SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); - TSKEY leftTimestamp = pWindowRes1->win.skey; - - SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo); - SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); - TSKEY rightTimestamp = pWindowRes2->win.skey; - - if (leftTimestamp == rightTimestamp) { - return 0; - } +int32_t tsDescOrder(const void* p1, const void* p2) { + SResultRowCell* pc1 = (SResultRowCell*) p1; + SResultRowCell* pc2 = (SResultRowCell*) p2; - if (supporter->order == TSDB_ORDER_ASC) { - return (leftTimestamp > rightTimestamp)? 1:-1; + if (pc1->groupId == pc2->groupId) { + if (pc1->pRow->win.skey == pc2->pRow->win.skey) { + return 0; + } else { + return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1; + } } else { - return (leftTimestamp < rightTimestamp)? 1:-1; + return (pc1->groupId < pc2->groupId)? -1:1; } } -static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, - int32_t* rowCellInfoOffset) { - bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); +void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { + __compar_fn_t fn = NULL; + if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { + fn = tsAscOrder; + } else { + fn = tsDescOrder; + } - int32_t code = TSDB_CODE_SUCCESS; + taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); +} - int32_t *posList = NULL; - SLoserTreeInfo *pTree = NULL; - STableQueryInfo **pTableQueryInfoList = NULL; +static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { + if (!pGroupResInfo->ordered) { + orderTheResultRows(pRuntimeEnv); + pGroupResInfo->ordered = true; + } - size_t size = taosArrayGetSize(pTableList); if (pGroupResInfo->pRows == NULL) { pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); } - posList = calloc(size, sizeof(int32_t)); - pTableQueryInfoList = malloc(POINTER_BYTES * size); - - if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) { - qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv)); - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _end; - } + size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList); + for(; pGroupResInfo->position < len; ++pGroupResInfo->position) { - int32_t numOfTables = 0; - for (int32_t i = 0; i < size; ++i) { - STableQueryInfo *item = taosArrayGetP(pTableList, i); - if (item->resInfo.size > 0) { - pTableQueryInfoList[numOfTables++] = item; + SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position); + if (pResultRowCell->groupId != groupId) { + break; } - } - - // there is no data in current group - // no need to merge results since only one table in each group - if (numOfTables == 0) { - goto _end; - } - - SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order}; - - int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); - if (ret != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _end; - } - - int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX; - int64_t startt = taosGetTimestampMs(); - - while (1) { - int32_t tableIndex = pTree->pNode[0].index; - SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo; - SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]); - - int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset); + int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pResultRowCell->pRow, rowCellInfoOffset); if (num <= 0) { - cs.rowIndex[tableIndex] += 1; - - if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) { - cs.rowIndex[tableIndex] = -1; - if (--numOfTables == 0) { // all input sources are exhausted - break; - } - } - } else { - assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery)); - - if (pWindowRes->win.skey != lastTimestamp) { - taosArrayPush(pGroupResInfo->pRows, &pWindowRes); - pWindowRes->numOfRows = (uint32_t) num; - } - - lastTimestamp = pWindowRes->win.skey; - - // move to the next row of current entry - if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) { - cs.rowIndex[tableIndex] = -1; - - // all input sources are exhausted - if ((--numOfTables) == 0) { - break; - } - } + continue; } - tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries); - } - - int64_t endt = taosGetTimestampMs(); - - qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv), - pGroupResInfo->currentGroup, endt - startt); + taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pRow); + pResultRowCell->pRow->numOfRows = (uint32_t) num; - _end: - tfree(pTableQueryInfoList); - tfree(posList); - tfree(pTree); + } - return code; + return TSDB_CODE_SUCCESS; } int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) { int64_t st = taosGetTimestampUs(); while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { - SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); - - int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } + mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset); // this group generates at least one result, return results if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { @@ -583,7 +508,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); -// pQInfo->summary.firstStageMergeTime += elapsedTime; return TSDB_CODE_SUCCESS; } diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 51801c843c..8ce5e7ade8 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -100,7 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k } static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) { - STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose STSchema* pSchema = NULL; STSchema* pTSchema = NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 71a9009c45..45140192b4 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -288,8 +288,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; - info.tableId = ((STable*)(pKeyInfo->pTable))->tableId; - assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); @@ -2218,7 +2216,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO SBlock* pBlock = pTableCheck->pCompInfo->blocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; - char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); + char* buf = malloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); if (buf == NULL) { cleanBlockOrderSupporter(&sup, numOfQualTables); return TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -3618,8 +3616,6 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC for(int32_t i = 0; i < size; ++i) { STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i); - assert(((STable*)pKeyInfo->pTable)->type == TSDB_CHILD_TABLE); - tsdbRefTable(pKeyInfo->pTable); STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey}; diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 6118aa7bef..28ba791c9c 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -741,17 +741,19 @@ void taosHashTableResize(SHashObj *pHashObj) { } SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { - SHashNode *pNewNode = calloc(1, sizeof(SHashNode) + keyLen + dsize); + SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize); if (pNewNode == NULL) { uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - pNewNode->keyLen = (uint32_t)keyLen; + pNewNode->keyLen = (uint32_t)keyLen; pNewNode->hashVal = hashVal; pNewNode->dataLen = (uint32_t) dsize; - pNewNode->count = 1; + pNewNode->count = 1; + pNewNode->removed = 0; + pNewNode->next = NULL; memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen); diff --git a/src/util/src/tlosertree.c b/src/util/src/tlosertree.c index e793548407..0f104c4b63 100644 --- a/src/util/src/tlosertree.c +++ b/src/util/src/tlosertree.c @@ -90,12 +90,13 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { SLoserTreeNode kLeaf = pTree->pNode[idx]; while (parentId > 0) { - if (pTree->pNode[parentId].index == -1) { + SLoserTreeNode* pCur = &pTree->pNode[parentId]; + if (pCur->index == -1) { pTree->pNode[parentId] = kLeaf; return; } - int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param); + int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param); if (ret < 0) { SLoserTreeNode t = pTree->pNode[parentId]; pTree->pNode[parentId] = kLeaf; -- GitLab