提交 443e8e25 编写于 作者: H Haojun Liao

fix(query): add limit/offset for sys table scan operator.

上级 d69b57fa
...@@ -65,7 +65,7 @@ typedef struct SSysTableScanInfo { ...@@ -65,7 +65,7 @@ typedef struct SSysTableScanInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
int64_t numOfBlocks; // extract basic running information. int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
SLimitInfo limitInfo;
int32_t tbnameSlotId; int32_t tbnameSlotId;
} SSysTableScanInfo; } SSysTableScanInfo;
...@@ -133,7 +133,6 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac ...@@ -133,7 +133,6 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName); static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
static void destroySysScanOperator(void* param); static void destroySysScanOperator(void* param);
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo);
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse); static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable, static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
...@@ -351,7 +350,7 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt); ...@@ -351,7 +350,7 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name, static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode); void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock); const char* name, SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) { __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) { if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
...@@ -556,7 +555,7 @@ void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfR ...@@ -556,7 +555,7 @@ void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfR
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false);
doFilterResult(pInfo->pRes, pFilterInfo); doFilter(pInfo->pRes, pFilterInfo, NULL);
blockDataCleanup(dataBlock); blockDataCleanup(dataBlock);
} }
...@@ -975,7 +974,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { ...@@ -975,7 +974,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
...@@ -991,7 +990,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { ...@@ -991,7 +990,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
...@@ -1152,7 +1151,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { ...@@ -1152,7 +1151,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
...@@ -1168,7 +1167,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { ...@@ -1168,7 +1167,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
...@@ -1199,7 +1198,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { ...@@ -1199,7 +1198,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database. // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) { if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
...@@ -1324,6 +1323,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1324,6 +1323,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
getDBNameFromCondition(pInfo->pCondition, dbName); getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
} }
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pBlock = sysTableScanUserTables(pOperator); pBlock = sysTableScanUserTables(pOperator);
...@@ -1336,30 +1336,37 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1336,30 +1336,37 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo); pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo);
} }
return sysTableScanFillTbName(pOperator, pInfo, name, pBlock); sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
}
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock) {
if (pBlock != NULL) { if (pBlock != NULL) {
if (pInfo->tbnameSlotId != -1) { bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId); if (limitReached) {
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; setOperatorCompleted(pOperator);
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
for (int i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColumnInfoData, i, varTbName, NULL);
}
doFilterResult(pBlock, pOperator->exprSupp.pFilterInfo);
} }
}
if (pBlock && pBlock->info.rows != 0) { return pBlock->info.rows > 0? pBlock:NULL;
return pBlock;
} else { } else {
return NULL; return NULL;
} }
} }
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
colDataAppendNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
}
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
}
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name, static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -1423,7 +1430,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca ...@@ -1423,7 +1430,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
// todo log the filter info // todo log the filter info
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
...@@ -1457,13 +1464,13 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1457,13 +1464,13 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->sysInfo = pScanPhyNode->sysInfo;
pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pCondition = pScanNode->node.pConditions; pInfo->pCondition = pScanNode->node.pConditions;
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initLimitInfo(pScanPhyNode->scan.node.pLimit, pScanPhyNode->scan.node.pSlimit, &pInfo->limitInfo);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
...@@ -1556,15 +1563,6 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1556,15 +1563,6 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL) {
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
doFilter(pDataBlock, pFilterInfo, NULL);
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
static int32_t sysChkFilter__Comm(SNode* pNode) { static int32_t sysChkFilter__Comm(SNode* pNode) {
// impl // impl
SOperatorNode* pOper = (SOperatorNode*)pNode; SOperatorNode* pOper = (SOperatorNode*)pNode;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册