提交 7152a197 编写于 作者: H Haojun Liao

[td-6563] refactor the group result merge function.

上级 aa79a68b
......@@ -1097,6 +1097,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
} else {
pQueryMsg->tsBuf.tsLen = 0;
pQueryMsg->tsBuf.tsNumOfBlocks = 0;
}
int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator);
......@@ -1134,6 +1137,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += pUdfInfo->contLen;
}
} else {
pQueryMsg->udfContentOffset = 0;
}
memcpy(pMsg, pSql->sqlstr, sqlLen);
......
......@@ -86,11 +86,18 @@ typedef struct SResultRow {
char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowCell {
uint64_t groupId;
SResultRow *pRow;
} SResultRowCell;
typedef struct SGroupResInfo {
int32_t totalGroup;
int32_t currentGroup;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
bool ordered;
int32_t position;
} SGroupResInfo;
/**
......@@ -284,8 +291,9 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool
SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult>
......
......@@ -544,6 +544,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult
// add a new result set for a new group
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES);
SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult};
taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell);
} else {
pResult = *p1;
}
......@@ -2110,6 +2112,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES);
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell));
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen);
......@@ -6379,6 +6382,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
if (!pRuntimeEnv->pQueryAttr->stableQuery) {
sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
}
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
......@@ -8647,7 +8651,6 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
SArray* prevResult = NULL;
if (prevResultLen > 0) {
prevResult = interResFromBinary(param->prevResult, prevResultLen);
pRuntimeEnv->prevResult = prevResult;
}
......
......@@ -456,7 +456,79 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
}
}
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t tsAscOrder(const void* p1, const void* p2) {
SResultRowCell* pc1 = (SResultRowCell*) p1;
SResultRowCell* pc2 = (SResultRowCell*) p2;
if (pc1->groupId == pc2->groupId) {
if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
return 0;
} else {
return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1;
}
} else {
return (pc1->groupId < pc2->groupId)? -1:1;
}
}
int32_t tsDescOrder(const void* p1, const void* p2) {
SResultRowCell* pc1 = (SResultRowCell*) p1;
SResultRowCell* pc2 = (SResultRowCell*) p2;
if (pc1->groupId == pc2->groupId) {
if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
return 0;
} else {
return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1;
}
} else {
return (pc1->groupId < pc2->groupId)? -1:1;
}
}
void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
__compar_fn_t fn = NULL;
if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder;
} else {
fn = tsDescOrder;
}
taosArraySort(pRuntimeEnv->pResultRowArrayList, fn);
}
static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
if (!pGroupResInfo->ordered) {
orderTheResultRows(pRuntimeEnv);
pGroupResInfo->ordered = true;
}
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList);
for(; pGroupResInfo->position < len; ++pGroupResInfo->position) {
SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position);
if (pResultRowCell->groupId != groupId) {
break;
}
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pResultRowCell->pRow, rowCellInfoOffset);
if (num <= 0) {
continue;
}
taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pRow);
pResultRowCell->pRow->numOfRows = (uint32_t) num;
}
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr);
......@@ -562,12 +634,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup);
// SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset);
// int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// this group generates at least one result, return results
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册