提交 b57f7414 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 ddbe4095
...@@ -56,7 +56,7 @@ typedef struct SColumnDataAgg { ...@@ -56,7 +56,7 @@ typedef struct SColumnDataAgg {
typedef struct SDataBlockInfo { typedef struct SDataBlockInfo {
STimeWindow window; STimeWindow window;
int32_t rows; int32_t rows;
int32_t tupleSize; int32_t rowSize;
int32_t numOfCols; int32_t numOfCols;
union {int64_t uid; int64_t blockId;}; union {int64_t uid; int64_t blockId;};
} SDataBlockInfo; } SDataBlockInfo;
...@@ -70,10 +70,10 @@ typedef struct SConstantItem { ...@@ -70,10 +70,10 @@ typedef struct SConstantItem {
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg* pBlockAgg; SColumnDataAgg *pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData> SArray *pDataBlock; // SArray<SColumnInfoData>
SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value. SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;
typedef struct SVarColAttr { typedef struct SVarColAttr {
...@@ -218,18 +218,17 @@ typedef struct SColumn { ...@@ -218,18 +218,17 @@ typedef struct SColumn {
int64_t dataBlockId; int64_t dataBlockId;
}; };
char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
union { union {
int16_t colId; int16_t colId;
int16_t slotId; int16_t slotId;
}; };
char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
uint8_t precision; uint8_t precision;
uint8_t scale; uint8_t scale;
// SColumnInfo info;
} SColumn; } SColumn;
typedef struct SLimit { typedef struct SLimit {
...@@ -254,11 +253,20 @@ typedef struct SFunctParam { ...@@ -254,11 +253,20 @@ typedef struct SFunctParam {
} SFunctParam; } SFunctParam;
// the structure for sql function in select clause // the structure for sql function in select clause
typedef struct SResSchame {
int8_t type;
int32_t colId;
int32_t bytes;
int32_t precision;
int32_t scale;
char name[TSDB_COL_NAME_LEN];
} SResSchema;
// TODO move away to executor.h
typedef struct SExprBasicInfo { typedef struct SExprBasicInfo {
SSchema resSchema; // TODO refactor SResSchema resSchema;
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
SFunctParam *pParam; SFunctParam *pParam;
// SVariant param[3]; // parameters are not more than 3
} SExprBasicInfo; } SExprBasicInfo;
typedef struct SExprInfo { typedef struct SExprInfo {
......
...@@ -632,7 +632,7 @@ typedef struct SOrderOperatorInfo { ...@@ -632,7 +632,7 @@ typedef struct SOrderOperatorInfo {
uint64_t totalElapsed; // total elapsed time uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo; } SOrderOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
......
...@@ -347,6 +347,7 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { ...@@ -347,6 +347,7 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.blockId = pNode->dataBlockId; pBlock->info.blockId = pNode->dataBlockId;
pBlock->info.rowSize = pNode->resultRowSize;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {{0}}; SColumnInfoData idata = {{0}};
...@@ -683,12 +684,6 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ...@@ -683,12 +684,6 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
w.ekey = w.skey + pInterval->interval - 1; w.ekey = w.skey + pInterval->interval - 1;
} }
} }
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
// ASSERT(win->skey <= win->ekey); // todo no need this
return w; return w;
} }
...@@ -3348,14 +3343,13 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t ...@@ -3348,14 +3343,13 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
cleanupResultRowEntry(pEntry); cleanupResultRowEntry(pEntry);
pCtx[i].resultInfo = pEntry; pCtx[i].resultInfo = pEntry;
pCtx[i].pOutput = pData->pData; // todo remove it
pCtx[i].currentStage = stage; pCtx[i].currentStage = stage;
// set the timestamp output buffer for top/bottom/diff query // set the timestamp output buffer for top/bottom/diff query
int32_t fid = pCtx[i].functionId; // int32_t fid = pCtx[i].functionId;
if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) { // if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; // if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} // }
} }
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
...@@ -3663,7 +3657,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i ...@@ -3663,7 +3657,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
* all group belong to one result set, and each group result has different group id so set the id to be one * all group belong to one result set, and each group result has different group id so set the id to be one
*/ */
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.tupleSize); int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return; return;
} }
...@@ -5187,9 +5181,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { ...@@ -5187,9 +5181,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
#endif #endif
} }
// TODO remove it
static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); static SSDataBlock* createResultDataBlock(const SArray* pExprInfo);
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -5200,9 +5195,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5200,9 +5195,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
return NULL; return NULL;
} }
size_t numOfSources = taosArrayGetSize(pSources); size_t numOfSources = LIST_LENGTH(pSources);
pInfo->pSources = taosArrayDup(pSources); // pInfo->pSources = taosArrayDup(pSources);
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
tfree(pInfo); tfree(pInfo);
...@@ -5222,8 +5217,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5222,8 +5217,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
} }
size_t size = taosArrayGetSize(pExprInfo); size_t size = taosArrayGetSize(pExprInfo);
pInfo->pResult = createResultDataBlock(pExprInfo); pInfo->pResult = createResultDataBlock(pExprInfo);
pInfo->seqLoadData = true; pInfo->seqLoadData = true;
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
...@@ -5277,11 +5272,12 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { ...@@ -5277,11 +5272,12 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
SColumnInfoData colInfoData = {0}; SColumnInfoData colInfoData = {0};
SExprInfo* p = taosArrayGetP(pExprInfo, i); SExprInfo* p = taosArrayGetP(pExprInfo, i);
SSchema* pSchema = &p->base.resSchema; SResSchema* pSchema = &p->base.resSchema;
colInfoData.info.type = pSchema->type; colInfoData.info.type = pSchema->type;
colInfoData.info.colId = pSchema->colId; colInfoData.info.colId = pSchema->colId;
colInfoData.info.bytes = pSchema->bytes; colInfoData.info.bytes = pSchema->bytes;
colInfoData.info.scale = pSchema->scale;
colInfoData.info.precision = pSchema->precision;
taosArrayPush(pResult, &colInfoData); taosArrayPush(pResult, &colInfoData);
} }
...@@ -8016,9 +8012,19 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t ...@@ -8016,9 +8012,19 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* createExprInfo(SAggPhysiNode* pPhyNode, int32_t* resultRowSize) { static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) {
*resultRowSize = pPhyNode->node.pOutputDataBlockDesc->resultRowSize; SResSchema s = {0};
s.scale = scale;
s.precision = precision;
s.type = type;
s.bytes = bytes;
s.colId = slotId;
strncpy(s.name, name, tListLen(s.name));
return s;
}
SArray* createExprInfo(SAggPhysiNode* pPhyNode) {
int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs); int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs);
SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES); SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES);
...@@ -8038,9 +8044,12 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode, int32_t* resultRowSize) { ...@@ -8038,9 +8044,12 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode, int32_t* resultRowSize) {
ASSERT(pTargetNode->slotId == i); ASSERT(pTargetNode->slotId == i);
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
pExp->base.resSchema = createSchema(pFuncNode->node.resType.type, pFuncNode->node.resType.bytes, pTargetNode->slotId, pFuncNode->node.aliasName);
pExp->pExpr->_function.pFunctNode = pFuncNode;
SDataType *pType = &pFuncNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId,
pType->scale, pType->precision, pFuncNode->node.aliasName);
pExp->pExpr->_function.pFunctNode = pFuncNode;
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
// TODO: value parameter needs to be handled // TODO: value parameter needs to be handled
...@@ -8089,16 +8098,17 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa ...@@ -8089,16 +8098,17 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
// SExchangePhysiNode* pEx = (SExchangePhysiNode*) pPhyNode; SExchangePhysiNode* pEx = (SExchangePhysiNode*)pPhyNode;
// return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); return createExchangeOperatorInfo(pEx->pSrcEndPoints, NULL, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableGroupInfo groupInfo = {0}; STableGroupInfo groupInfo = {0};
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
...@@ -8136,8 +8146,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa ...@@ -8136,8 +8146,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t resultRowSize = 0; SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode);
SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode, &resultRowSize);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册