提交 3a23a30a 编写于 作者: H Haojun Liao

[td-11818] Refactor.

上级 ec99edc5
...@@ -73,7 +73,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u ...@@ -73,7 +73,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void colDataTrim(SColumnInfoData* pColumnInfoData); void colDataTrim(SColumnInfoData* pColumnInfoData);
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#endif #endif
OP_ENUM_MACRO(StreamScan) OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan) OP_ENUM_MACRO(SystemTableScan)
......
...@@ -234,7 +234,7 @@ size_t colDataGetNumOfRows(const SSDataBlock* pBlock) { ...@@ -234,7 +234,7 @@ size_t colDataGetNumOfRows(const SSDataBlock* pBlock) {
return pBlock->info.rows; return pBlock->info.rows;
} }
int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock) { int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
return 0; return 0;
} }
......
...@@ -97,10 +97,11 @@ typedef struct SSingleColumnFilterInfo { ...@@ -97,10 +97,11 @@ typedef struct SSingleColumnFilterInfo {
} SSingleColumnFilterInfo; } SSingleColumnFilterInfo;
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; TSKEY lastKey; // last check ts
uint64_t uid; // table uid
int32_t groupIndex; // group id in table list int32_t groupIndex; // group id in table list
// SVariant tag; // SVariant tag;
SResultRowInfo resInfo; SResultRowInfo resInfo; // result info
} STableQueryInfo; } STableQueryInfo;
typedef enum { typedef enum {
...@@ -596,8 +597,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -596,8 +597,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
......
...@@ -4015,7 +4015,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI ...@@ -4015,7 +4015,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
return 0; return 0;
} }
static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) { static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
pBlock->info.rows = 0; pBlock->info.rows = 0;
...@@ -4024,17 +4024,10 @@ static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntime ...@@ -4024,17 +4024,10 @@ static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntime
} }
int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
doCopyToSDataBlock(NULL, pGroupResInfo, orderType, pBlock, 0); doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity);
// refactor : extract method // add condition (pBlock->info.rows >= 1) just to runtime happy
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); blockDataUpdateTsWindow(pBlock);
//add condition (pBlock->info.rows >= 1) just to runtime happy
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) {
STimeWindow* w = &pBlock->info.window;
w->skey = *(int64_t*)pInfoData->pData;
w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pBlock->info.rows - 1));
}
} }
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput,
...@@ -5373,7 +5366,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -5373,7 +5366,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pOperator->name = "DataBlocksOptimizedScanOperator"; pOperator->name = "DataBlocksOptimizedScanOperator";
pOperator->operatorType = OP_DataBlocksOptScan; pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -6188,7 +6181,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { ...@@ -6188,7 +6181,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
if (pInfo->pRes->info.rows == 0 /*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { if (pInfo->pRes->info.rows == 0 /*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -6212,7 +6205,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { ...@@ -6212,7 +6205,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
} }
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// if (downstream->operatorType == OP_DataBlocksOptScan) { // if (downstream->operatorType == OP_TableScan) {
// STableScanInfo* pScanInfo = downstream->info; // STableScanInfo* pScanInfo = downstream->info;
// order = getTableScanOrder(pScanInfo); // order = getTableScanOrder(pScanInfo);
// } // }
...@@ -6429,7 +6422,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -6429,7 +6422,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -6469,7 +6462,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -6469,7 +6462,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
...@@ -6488,7 +6481,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -6488,7 +6481,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
...@@ -6529,7 +6522,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -6529,7 +6522,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -6743,7 +6736,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { ...@@ -6743,7 +6736,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -6781,7 +6774,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { ...@@ -6781,7 +6774,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -6802,7 +6795,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -6802,7 +6795,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -6841,7 +6834,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -6841,7 +6834,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -6860,7 +6853,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -6860,7 +6853,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -6904,8 +6897,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -6904,8 +6897,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
} }
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -7049,17 +7041,32 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { ...@@ -7049,17 +7041,32 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
tfree(pOperator); tfree(pOperator);
} }
static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows) { static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows, const STableGroupInfo* pTableGroupInfo) {
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize);
pInfo->binfo.capacity = 4096;
pInfo->pResultRowHashTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pResultRowListSet = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo)); pInfo->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pInfo->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo));
pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
// TODO: number of tables pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
pInfo->pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
int32_t index = 0;
for(int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
for(int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pInfo->pTableQueryInfo[index];
pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey;
pTQueryInfo->groupIndex = i;
}
}
STimeWindow win = {0, INT64_MAX}; STimeWindow win = {0, INT64_MAX};
createTableQueryInfo(pInfo->pTableQueryInfo, false, win); createTableQueryInfo(pInfo->pTableQueryInfo, false, win);
...@@ -7078,13 +7085,13 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) { ...@@ -7078,13 +7085,13 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) {
return p; return p;
} }
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1; int32_t numOfRows = 1;
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
initAggInfo(pInfo, pExprInfo, numOfRows); initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo);
setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo); setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo);
size_t numOfOutput = taosArrayGetSize(pExprInfo); size_t numOfOutput = taosArrayGetSize(pExprInfo);
...@@ -7181,15 +7188,15 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7181,15 +7188,15 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
size_t tableGroup = 1; // GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
int32_t numOfRows = 1; int32_t numOfRows = 1;
size_t numOfOutput = taosArrayGetSize(pExprInfo); size_t numOfOutput = taosArrayGetSize(pExprInfo);
initAggInfo(pInfo, pExprInfo, numOfRows); initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo);
SExprInfo* p = exprArrayDup(pExprInfo);
// initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableAggregate"; pOperator->name = "MultiTableAggregate";
...@@ -7197,7 +7204,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray ...@@ -7197,7 +7204,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = p; pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->exec = doMultiTableAggregate; pOperator->exec = doMultiTableAggregate;
...@@ -8045,14 +8052,15 @@ static tsdbReaderT doCreateDataReader(STableScanPhyNode* pTableScanNode, SReadHa ...@@ -8045,14 +8052,15 @@ static tsdbReaderT doCreateDataReader(STableScanPhyNode* pTableScanNode, SReadHa
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) { SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) {
if (pPhyNode->info.type == OP_DataBlocksOptScan) { if (pPhyNode->info.type == OP_TableScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhyNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhyNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
} else if (pPhyNode->info.type == OP_Exchange) { } else if (pPhyNode->info.type == OP_Exchange) {
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
...@@ -8086,10 +8094,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask ...@@ -8086,10 +8094,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
size_t size = taosArrayGetSize(pPhyNode->pChildren); size_t size = taosArrayGetSize(pPhyNode->pChildren);
assert(size == 1); assert(size == 1);
// TODO single table agg
for (int32_t i = 0; i < size; ++i) {
SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
} else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i); SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo); return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
} }
} }
} }
...@@ -8164,7 +8182,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -8164,7 +8182,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId); STableGroupInfo group = {0};
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
if ((*pTaskInfo)->pRoot == NULL) { if ((*pTaskInfo)->pRoot == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY; code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete; goto _complete;
......
...@@ -216,7 +216,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable ...@@ -216,7 +216,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
} else if (needSeqScan(pPlanNode)) { } else if (needSeqScan(pPlanNode)) {
return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
} }
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTable, type); return createUserTableScanNode(pPlanNode, pTable, type);
} }
...@@ -288,7 +288,7 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { ...@@ -288,7 +288,7 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList; SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode); vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTableInfo, type); return createUserTableScanNode(pPlanNode, pTableInfo, type);
} }
......
...@@ -88,7 +88,7 @@ static const char* jkPnodeType = "Type"; ...@@ -88,7 +88,7 @@ static const char* jkPnodeType = "Type";
static int32_t getPnodeTypeSize(cJSON* json) { static int32_t getPnodeTypeSize(cJSON* json) {
switch (getNumber(json, jkPnodeType)) { switch (getNumber(json, jkPnodeType)) {
case OP_StreamScan: case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_TableScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return sizeof(STableScanPhyNode); return sizeof(STableScanPhyNode);
case OP_TagScan: case OP_TagScan:
...@@ -830,7 +830,7 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { ...@@ -830,7 +830,7 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
const SPhyNode* phyNode = (const SPhyNode*)obj; const SPhyNode* phyNode = (const SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_StreamScan: case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_TableScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return tableScanNodeToJson(obj, json); return tableScanNodeToJson(obj, json);
case OP_TagScan: case OP_TagScan:
...@@ -868,7 +868,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { ...@@ -868,7 +868,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode* phyNode = (SPhyNode*)obj; SPhyNode* phyNode = (SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_StreamScan: case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_TableScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return tableScanNodeFromJson(json, obj); return tableScanNodeFromJson(json, obj);
case OP_TagScan: case OP_TagScan:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册