提交 407a3453 编写于 作者: H Haojun Liao

fix(query): set fetch block info.

上级 2c896012
......@@ -95,17 +95,20 @@ typedef struct SColMatchInfo {
int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
typedef struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfGroups;
int32_t* groupOffset; // keep the offset value for each group in the tableList
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
} STableListInfo;
void destroyTableList(STableListInfo* pTableList);
int32_t getNumOfGroups(const STableListInfo* pTableList);
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
......
......@@ -62,7 +62,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname, todo opt perf
if (getNumOfGroups(pTableList) == getTotalTables(pTableList)) {
if (oneTableForEachGroup(pTableList)) {
pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
......@@ -189,7 +189,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL;
}
} else {
size_t totalGroups = getNumOfGroups(pTableList);
size_t totalGroups = getNumOfOutputGroups(pTableList);
while (pInfo->currentGroupIndex < totalGroups) {
......
......@@ -1665,6 +1665,11 @@ uint64_t getTotalTables(const STableListInfo* pTableList) {
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
if (pTableList->oneTableForEachGroup) {
ASSERT(pTableList->map == NULL);
return tableUid;
}
uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
if (groupId != NULL) {
return *groupId;
......@@ -1677,25 +1682,25 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
taosArrayPush(pTableList->pTableList, &keyInfo);
if (pTableList->oneTableForEachGroup || pTableList->numOfGroups > 1) {
if (pTableList->oneTableForEachGroup || pTableList->numOfOuputGroups > 1) {
taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
}
return TSDB_CODE_SUCCESS;
}
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) {
int32_t total = getNumOfGroups(pTableList);
int32_t total = getNumOfOutputGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
return TSDB_CODE_INVALID_PARA;
}
// here handle two special cases:
// 1. only one group exists, and 2. one table exists for each group.
if (pTableList->numOfGroups == 1) {
if (total == 1) {
*size = getTotalTables(pTableList);
*pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS;
} else if (pTableList->numOfGroups == getTotalTables(pTableList)) {
} else if (total == getTotalTables(pTableList)) {
*size = 1;
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
return TSDB_CODE_SUCCESS;
......@@ -1712,8 +1717,12 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
return TSDB_CODE_SUCCESS;
}
int32_t getNumOfGroups(const STableListInfo* pTableList) {
return pTableList->numOfGroups;
int32_t getNumOfOutputGroups(const STableListInfo* pTableList) {
return pTableList->numOfOuputGroups;
}
bool oneTableForEachGroup(const STableListInfo* pTableList) {
return pTableList->oneTableForEachGroup;
}
void destroyTableList(STableListInfo* pTableqinfoList) {
......
......@@ -3397,9 +3397,9 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
}
}
pTableListInfo->numOfGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfGroups);
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList);
# if 0
......@@ -3490,13 +3490,15 @@ bool groupbyTbname(SNodeList* pGroupList) {
}
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS;
if (group == NULL) {
return TDB_CODE_SUCCESS;
return code;
}
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pTableListInfo->map == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
bool assignUid = groupbyTbname(group);
......@@ -3508,21 +3510,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
}
pTableListInfo->oneTableForEachGroup = true;
pTableListInfo->numOfGroups = numOfTables;
if (groupSort) {
pTableListInfo->numOfOuputGroups = numOfTables;
}
} else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (groupSort) {
return sortTableGroup(pTableListInfo);
code = sortTableGroup(pTableListInfo);
}
}
qDebug("-------------------, %d", (int) taosHashGetSize(pTableListInfo->map));
return TDB_CODE_SUCCESS;
return code;
}
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
......
......@@ -746,7 +746,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
} else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= getNumOfGroups(&pTaskInfo->tableqinfoList)) {
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
......@@ -768,7 +768,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result;
}
if ((++pInfo->currentGroupId) >= getNumOfGroups(&pTaskInfo->tableqinfoList)) {
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
......@@ -1094,7 +1094,9 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};
SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
blockDataCleanup(pTableScanInfo->pResBlock);
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
blockDataCleanup(pBlock);
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
......@@ -1106,14 +1108,25 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
bool hasBlock = tsdbNextDataBlock(pReader);
if (hasBlock) {
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
relocateColumnData(pTableScanInfo->pResBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pTableScanInfo->pResBlock, pTaskInfo);
SDataBlockInfo binfo = {0};
tsdbRetrieveDataBlockInfo(pReader, &binfo);
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info.window = binfo.window;
pBlock->info.uid = binfo.uid;
pBlock->info.rows = binfo.rows;
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
}
tsdbReaderClose(pReader);
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
return pTableScanInfo->pResBlock->info.rows > 0? pTableScanInfo->pResBlock:NULL;
return pBlock->info.rows > 0 ? pBlock : NULL;
}
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
......@@ -2371,7 +2384,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
......@@ -4152,7 +4165,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS;
}
pTableListInfo->numOfGroups = 1;
pTableListInfo->numOfOuputGroups = 1;
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags, groupSort);
if (code != TSDB_CODE_SUCCESS) {
return code;
......
......@@ -654,6 +654,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
}
pInfo->limitInfo.numOfOutputRows += p->info.rows;
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.groupId = pInfo->groupId;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册