提交 41c99c02 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 52c37778
...@@ -62,7 +62,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -62,7 +62,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname // partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) {
pInfo->retrieveType = pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
...@@ -175,12 +175,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -175,12 +175,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
} else { } else {
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); size_t num = getNumOfOutputGroups(pTableList);
while (pInfo->currentGroupIndex < num) {
while (pInfo->currentGroupIndex < totalGroups) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); STableKeyInfo* p = NULL;
int32_t s = 0;
getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &p, &s);
SArray* x = taosArrayInit(4, sizeof(STableKeyInfo));
for(int32_t i = 0; i < s; ++i) {
taosArrayPush(x, &p[i]);
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, x,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
...@@ -195,9 +201,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -195,9 +201,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
if (pInfo->pseudoExprSup.numOfExprs > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup; SExprSupp* pSup = &pInfo->pseudoExprSup;
pInfo->pRes->info.groupId = p->groupId;
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) { if (taosArrayGetSize(pInfo->pUidList) > 0) {
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
......
...@@ -997,14 +997,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -997,14 +997,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
} }
taosArrayDestroy(res); taosArrayDestroy(res);
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
if (pListInfo->pGroupList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// put into list as default group, remove it if grouping sorting is required later
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
return code; return code;
} }
...@@ -1698,7 +1690,7 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI ...@@ -1698,7 +1690,7 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
// 1. only one group exists, and 2. one table exists for each group. // 1. only one group exists, and 2. one table exists for each group.
if (total == 1) { if (total == 1) {
*size = getTotalTables(pTableList); *size = getTotalTables(pTableList);
*pKeyInfo = taosArrayGet(pTableList->pTableList, 0); *pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (total == getTotalTables(pTableList)) { } else if (total == getTotalTables(pTableList)) {
*size = 1; *size = 1;
......
...@@ -4026,15 +4026,15 @@ void doDestroyTableList(STableListInfo* pTableqinfoList) { ...@@ -4026,15 +4026,15 @@ void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList); taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map); taosHashCleanup(pTableqinfoList->map);
if (pTableqinfoList->needSortTableByGroupId) { if (pTableqinfoList->needSortTableByGroupId) {
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { // for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); // SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
if (tmp == pTableqinfoList->pTableList) { // if (tmp == pTableqinfoList->pTableList) {
continue; // continue;
} // }
taosArrayDestroy(tmp); // taosArrayDestroy(tmp);
} // }
} }
taosArrayDestroy(pTableqinfoList->pGroupList); // taosArrayDestroy(pTableqinfoList->pGroupList);
pTableqinfoList->pTableList = NULL; pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL; pTableqinfoList->map = NULL;
......
...@@ -749,20 +749,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -749,20 +749,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
pInfo->currentGroupId++; pInfo->currentGroupId++;
qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), // qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList),
getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); // getNumOfOutputGroups(&pTaskInfo->tableqinfoList));
if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) { if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) {
// if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { // if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray* p = taosArrayInit(4, sizeof(STableKeyInfo));
tsdbReaderClose(pInfo->dataReader); tsdbReaderClose(pInfo->dataReader);
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, STableKeyInfo* x = NULL;
int32_t num = 0;
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &x, &num);
for(int32_t i = 0; i < num; ++i) {
taosArrayPush(p, &x[i]);
}
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, p, (STsdbReader**)&pInfo->dataReader,
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
taosArrayDestroy(p);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
return NULL; return NULL;
...@@ -775,11 +783,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -775,11 +783,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
pInfo->currentGroupId++; pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
setTaskStatus(pTaskInfo, TASK_COMPLETED); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
// reset value for the next group data output
pOperator->status = OP_OPENED;
pInfo->limitInfo.numOfOutputRows = 0;
pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset;
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
...@@ -1075,39 +1088,59 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { ...@@ -1075,39 +1088,59 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->cond.twindows = *pWin; pTableScanInfo->cond.twindows = *pWin;
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
} tsdbReaderClose(pTableScanInfo->dataReader);
pTableScanInfo->dataReader = NULL;
static void freeArray(void* array) { taosArrayDestroy(array); }
static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
STableScanInfo* pTableScanInfo = pTableScanOp->info;
pTableScanInfo->cond.startVersion = -1;
pTableScanInfo->cond.endVersion = -1;
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList;
taosArrayClearP(gpTbls, freeArray);
taosArrayPush(gpTbls, &allTbls);
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
resetTableScanInfo(pTableScanOp->info, &win);
} }
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
int64_t maxVersion) { int64_t maxVersion) {
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
taosArrayClear(gpTbls);
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tbls, &tblInfo);
taosArrayPush(gpTbls, &tbls);
STimeWindow win = {.skey = startTs, .ekey = endTs}; STableScanInfo* pTableScanInfo = pTableScanOp->info;
STableScanInfo* pTableScanInfo = pTableScanOp->info; SQueryTableDataCond cond = pTableScanInfo->cond;
pTableScanInfo->cond.startVersion = -1;
pTableScanInfo->cond.endVersion = maxVersion; cond.startVersion = -1;
resetTableScanInfo(pTableScanOp->info, &win); cond.endVersion = maxVersion;
SSDataBlock* pRes = doTableScan(pTableScanOp); cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};
resetTableScanOperator(pTableScanOp);
return pRes; SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
blockDataCleanup(pBlock);
SArray* p = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(p, &tblInfo);
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, p, (STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
bool hasBlock = tsdbNextDataBlock(pReader);
if (hasBlock) {
SDataBlockInfo binfo = {0};
tsdbRetrieveDataBlockInfo(pReader, &binfo);
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info.window = binfo.window;
pBlock->info.uid = binfo.uid;
pBlock->info.rows = binfo.rows;
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid);
}
tsdbReaderClose(pReader);
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
return pBlock->info.rows > 0 ? pBlock : NULL;
} }
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
...@@ -2329,11 +2362,20 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2329,11 +2362,20 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pTSInfo->cond.endVersion = pHandle->version; pTSInfo->cond.endVersion = pHandle->version;
} }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); // SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
STableKeyInfo* pList = NULL;
int32_t num = 0;
getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num);
SArray* p = taosArrayInit(4, sizeof(STableKeyInfo));
for(int32_t i = 0; i < num; ++i) {
taosArrayPush(p, &pList[i]);
}
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->dataReader = NULL; pTSInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) { if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, p, &pTSInfo->dataReader, NULL) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册