提交 2ead89e3 编写于 作者: S shenglian zhou

feat: table merge scan save work

上级 6a50e9e2
...@@ -163,6 +163,7 @@ typedef struct tExprNode { ...@@ -163,6 +163,7 @@ typedef struct tExprNode {
int32_t functionId; int32_t functionId;
int32_t num; int32_t num;
struct SFunctionNode *pFunctNode; struct SFunctionNode *pFunctNode;
int32_t functionType;
} _function; } _function;
struct { struct {
......
...@@ -109,6 +109,12 @@ int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid); ...@@ -109,6 +109,12 @@ int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid); bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
typedef struct {
int64_t uid;
int64_t ctbNum;
} SMetaStbStats;
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
typedef struct SMetaFltParam { typedef struct SMetaFltParam {
tb_uid_t suid; tb_uid_t suid;
int16_t cid; int16_t cid;
......
...@@ -144,12 +144,6 @@ typedef struct SMetaInfo { ...@@ -144,12 +144,6 @@ typedef struct SMetaInfo {
} SMetaInfo; } SMetaInfo;
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo); int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
typedef struct {
int64_t uid;
int64_t ctbNum;
} SMetaStbStats;
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
// tsdb // tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
......
...@@ -549,6 +549,17 @@ typedef struct SSysTableScanInfo { ...@@ -549,6 +549,17 @@ typedef struct SSysTableScanInfo {
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
} SSysTableScanInfo; } SSysTableScanInfo;
typedef struct STableCountScanInfo {
SReadHandle readHandle;
SSDataBlock* pRes;
SExprSupp pseudoSup;
SNode* pCondition;
SName name;
uint64_t suid;
uint64_t uid;
int8_t tableType;
} STableCountScanInfo;
typedef struct SBlockDistInfo { typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
STsdbReader* pHandle; STsdbReader* pHandle;
......
...@@ -1283,6 +1283,7 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { ...@@ -1283,6 +1283,7 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExprNode->_function.functionId = pFuncNode->funcId; pExprNode->_function.functionId = pFuncNode->funcId;
pExprNode->_function.pFunctNode = pFuncNode; pExprNode->_function.pFunctNode = pFuncNode;
pExprNode->_function.functionType = pFuncNode->funcType;
tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName)); tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
......
...@@ -2612,6 +2612,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -2612,6 +2612,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode,
SExecTaskInfo* pTaskInfo);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
...@@ -2802,6 +2804,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -2802,6 +2804,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
......
...@@ -4839,3 +4839,83 @@ _error: ...@@ -4839,3 +4839,83 @@ _error:
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return NULL; return NULL;
} }
// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
static void destoryTableCountScanOperator(void* param);
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pScanNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
STableCountScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (!pInfo || !pOperator) {
goto _error;
}
pInfo->readHandle = *readHandle;
if (pScanNode->pScanPseudoCols != NULL) {
SExprSupp* pSup = &pInfo->pseudoSup;
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
}
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
initResultSizeInfo(&pOperator->resultInfo, 1);
pInfo->pRes = createResDataBlock(pDescNode);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
tNameAssign(&pInfo->name, &pScanNode->tableName);
pInfo->suid = pScanNode->suid; // 0 for super table, super table uid for child table
pInfo->uid = pScanNode->uid; // super table uid, or child table uid
pInfo->tableType = pScanNode->tableType;
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
return pOperator;
_error:
if (pInfo != NULL) {
destoryTableCountScanOperator(pInfo);
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableCountScanInfo* pInfo = pOperator->info;
SExprSupp* pSup = &pInfo->pseudoSup;
if (pSup->numOfExprs != 1 || pSup->pExprInfo[0].pExpr->nodeType != QUERY_NODE_FUNCTION ||
pSup->pExprInfo[0].pExpr->_function.functionType != FUNCTION_TYPE_TABLE_COUNT ) {
qError("%s table count scan operator invalid pseduo columns", GET_TASKID(pTaskInfo));
}
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, 0);
if (pInfo->uid != 0) {
SMetaStbStats stats = {0};
metaGetStbStats(pInfo->readHandle.meta, pInfo->uid, &stats);
int64_t ctbNum = stats.ctbNum;
//TODO: wxy about the return type bigint or int?
colDataAppend(pColInfoData, 0, (char*)&ctbNum, false);
pInfo->pRes->info.rows = 1;
} else {
//TODO: get table count in this vnode?
}
return NULL;
}
static void destoryTableCountScanOperator(void* param) {
STableCountScanInfo* pTableCountScanInfo = param;
blockDataDestroy(pTableCountScanInfo->pRes);
cleanupExprSupp(&pTableCountScanInfo->pseudoSup);
taosMemoryFreeClear(param);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册