提交 2f703268 编写于 作者: H Haojun Liao

[td-6563]optimize the query performance.

上级 8d6228db
......@@ -1106,6 +1106,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tableCols[i].bytes = htons(pCol->bytes);
pQueryMsg->tableCols[i].type = htons(pCol->type);
pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters);
pQueryMsg->tableCols[i].flist.filterInfo = 0;
// append the filter information after the basic column information
serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg);
......@@ -1219,6 +1220,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
} else {
pQueryMsg->tsBuf.tsLen = 0;
pQueryMsg->tsBuf.tsNumOfBlocks = 0;
}
int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator);
......@@ -1256,6 +1260,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += pUdfInfo->contLen;
}
} else {
pQueryMsg->udfContentOffset = 0;
}
memcpy(pMsg, pSql->sqlstr, sqlLen);
......
......@@ -84,11 +84,18 @@ typedef struct SResultRow {
char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowCell {
uint64_t groupId;
SResultRow *pRow;
} SResultRowCell;
typedef struct SGroupResInfo {
int32_t totalGroup;
int32_t currentGroup;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
bool ordered;
int32_t position;
} SGroupResInfo;
/**
......@@ -280,6 +287,7 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool
char** prevRow;
......
......@@ -546,6 +546,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult
// add a new result set for a new group
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES);
SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult};
taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell);
} else {
pResult = *p1;
}
......@@ -2109,7 +2111,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell));
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES);
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
......@@ -2385,6 +2388,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroy(pRuntimeEnv->pResultRowArrayList);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
}
......
......@@ -416,158 +416,83 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow
return 0;
}
static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight;
int32_t tsAscOrder(const void* p1, const void* p2) {
SResultRowCell* pc1 = (SResultRowCell*) p1;
SResultRowCell* pc2 = (SResultRowCell*) p2;
SCompSupporter * supporter = (SCompSupporter *)param;
int32_t leftPos = supporter->rowIndex[left];
int32_t rightPos = supporter->rowIndex[right];
/* left source is exhausted */
if (leftPos == -1) {
return 1;
}
/* right source is exhausted*/
if (rightPos == -1) {
return -1;
if (pc1->groupId == pc2->groupId) {
if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
return 0;
} else {
return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1;
}
} else {
return (pc1->groupId < pc2->groupId)? -1:1;
}
}
STableQueryInfo** pList = supporter->pTableQueryInfo;
SResultRowInfo *pWindowResInfo1 = &(pList[left]->resInfo);
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
TSKEY leftTimestamp = pWindowRes1->win.skey;
SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo);
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
TSKEY rightTimestamp = pWindowRes2->win.skey;
if (leftTimestamp == rightTimestamp) {
return 0;
}
int32_t tsDescOrder(const void* p1, const void* p2) {
SResultRowCell* pc1 = (SResultRowCell*) p1;
SResultRowCell* pc2 = (SResultRowCell*) p2;
if (supporter->order == TSDB_ORDER_ASC) {
return (leftTimestamp > rightTimestamp)? 1:-1;
if (pc1->groupId == pc2->groupId) {
if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
return 0;
} else {
return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1;
}
} else {
return (leftTimestamp < rightTimestamp)? 1:-1;
return (pc1->groupId < pc2->groupId)? -1:1;
}
}
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr);
void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
__compar_fn_t fn = NULL;
if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder;
} else {
fn = tsDescOrder;
}
int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pRuntimeEnv->pResultRowArrayList, fn);
}
int32_t *posList = NULL;
SLoserTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL;
static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
if (!pGroupResInfo->ordered) {
orderTheResultRows(pRuntimeEnv);
pGroupResInfo->ordered = true;
}
size_t size = taosArrayGetSize(pTableList);
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
posList = calloc(size, sizeof(int32_t));
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv));
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList);
for(; pGroupResInfo->position < len; ++pGroupResInfo->position) {
int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pTableList, i);
if (item->resInfo.size > 0) {
pTableQueryInfoList[numOfTables++] = item;
SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position);
if (pResultRowCell->groupId != groupId) {
break;
}
}
// there is no data in current group
// no need to merge results since only one table in each group
if (numOfTables == 0) {
goto _end;
}
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order};
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX;
int64_t startt = taosGetTimestampMs();
while (1) {
int32_t tableIndex = pTree->pNode[0].index;
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pResultRowCell->pRow, rowCellInfoOffset);
if (num <= 0) {
cs.rowIndex[tableIndex] += 1;
if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
if (--numOfTables == 0) { // all input sources are exhausted
break;
}
}
} else {
assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery));
if (pWindowRes->win.skey != lastTimestamp) {
taosArrayPush(pGroupResInfo->pRows, &pWindowRes);
pWindowRes->numOfRows = (uint32_t) num;
}
lastTimestamp = pWindowRes->win.skey;
// move to the next row of current entry
if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
// all input sources are exhausted
if ((--numOfTables) == 0) {
break;
}
}
continue;
}
tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries);
}
int64_t endt = taosGetTimestampMs();
qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv),
pGroupResInfo->currentGroup, endt - startt);
taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pRow);
pResultRowCell->pRow->numOfRows = (uint32_t) num;
_end:
tfree(pTableQueryInfoList);
tfree(posList);
tfree(pTree);
}
return code;
return TSDB_CODE_SUCCESS;
}
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset);
// this group generates at least one result, return results
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
......@@ -583,7 +508,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv),
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
// pQInfo->summary.firstStageMergeTime += elapsedTime;
return TSDB_CODE_SUCCESS;
}
......
......@@ -100,7 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k
}
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) {
STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
......
......@@ -288,8 +288,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
info.tableId = ((STable*)(pKeyInfo->pTable))->tableId;
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
......@@ -2218,7 +2216,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
SBlock* pBlock = pTableCheck->pCompInfo->blocks;
sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
char* buf = malloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
if (buf == NULL) {
cleanBlockOrderSupporter(&sup, numOfQualTables);
return TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -3618,8 +3616,6 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
for(int32_t i = 0; i < size; ++i) {
STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
assert(((STable*)pKeyInfo->pTable)->type == TSDB_CHILD_TABLE);
tsdbRefTable(pKeyInfo->pTable);
STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey};
......
......@@ -741,17 +741,19 @@ void taosHashTableResize(SHashObj *pHashObj) {
}
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
SHashNode *pNewNode = calloc(1, sizeof(SHashNode) + keyLen + dsize);
SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize);
if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
pNewNode->keyLen = (uint32_t)keyLen;
pNewNode->keyLen = (uint32_t)keyLen;
pNewNode->hashVal = hashVal;
pNewNode->dataLen = (uint32_t) dsize;
pNewNode->count = 1;
pNewNode->count = 1;
pNewNode->removed = 0;
pNewNode->next = NULL;
memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);
memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen);
......
......@@ -90,12 +90,13 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
SLoserTreeNode kLeaf = pTree->pNode[idx];
while (parentId > 0) {
if (pTree->pNode[parentId].index == -1) {
SLoserTreeNode* pCur = &pTree->pNode[parentId];
if (pCur->index == -1) {
pTree->pNode[parentId] = kLeaf;
return;
}
int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param);
int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param);
if (ret < 0) {
SLoserTreeNode t = pTree->pNode[parentId];
pTree->pNode[parentId] = kLeaf;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册