提交 d8da10f2 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 87d9836a
...@@ -38,16 +38,7 @@ ...@@ -38,16 +38,7 @@
memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
} while (0) } while (0)
#define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf) \
do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \
*(void**)(_k) = (_buf); \
*(uint64_t*)((_k) + POINTER_BYTES) = (_uid); \
memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len)); \
} while (0)
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
...@@ -104,16 +95,17 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -104,16 +95,17 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo); STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo);
STableListInfo* tableListCreate(); STableListInfo* tableListCreate();
void* tableListDestroy(STableListInfo* pTableListInfo); void* tableListDestroy(STableListInfo* pTableListInfo);
void tableListClear(STableListInfo* pTableListInfo); void tableListClear(STableListInfo* pTableListInfo);
int32_t tableListGetOutputGroups(const STableListInfo* pTableList); int32_t tableListGetOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList); bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
uint64_t tableListGetSize(const STableListInfo* pTableList); int32_t* num);
uint64_t tableListGetSuid(const STableListInfo* pTableList); uint64_t tableListGetSize(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); uint64_t tableListGetSuid(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
......
...@@ -665,18 +665,25 @@ typedef struct SStreamFillOperatorInfo { ...@@ -665,18 +665,25 @@ typedef struct SStreamFillOperatorInfo {
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_explain_fn_t explain); __optr_close_fn_t closeFn, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
void* pInfo, SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo); void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup); void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey); const char* pkey);
void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
...@@ -702,14 +709,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul ...@@ -702,14 +709,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
extern void doDestroyExchangeOperatorInfo(void* param); extern void doDestroyExchangeOperatorInfo(void* param);
void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
void* pInfo, SExecTaskInfo* pTaskInfo);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
...@@ -724,6 +727,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -724,6 +727,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
...@@ -779,6 +784,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -779,6 +784,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
// clang-format on // clang-format on
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
...@@ -786,38 +793,22 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -786,38 +793,22 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t getMaximumIdleDurationSec();
/*
* ops: root operator
* data: *data save the result of encode, need to be freed by caller
* length: *length save the length of *data
* nOptrWithVal: *nOptrWithVal save the number of optr with value
* return: result code, 0 means success
*/
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal);
/*
* ops: root operator, created by caller
* data: save the result of decode
* length: the length of data
* return: result code, 0 means success
*/
int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model); char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec();
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order); int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
...@@ -840,15 +831,7 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); ...@@ -840,15 +831,7 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo);
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList); bool groupbyTbname(SNodeList* pGroupList);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
......
...@@ -712,7 +712,7 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) { ...@@ -712,7 +712,7 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
queryCostStatis(pTaskInfo); // print the query cost summary printTaskExecCostInLog(pTaskInfo); // print the query cost summary
doDestroyTask(pTaskInfo); doDestroyTask(pTaskInfo);
} }
...@@ -728,12 +728,12 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { ...@@ -728,12 +728,12 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
} }
int32_t nOptrWithVal = 0; int32_t nOptrWithVal = 0;
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); // int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { // if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
taosMemoryFreeClear(*pOutput); // taosMemoryFreeClear(*pOutput);
*len = 0; // *len = 0;
} // }
return code; return 0;
} }
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
...@@ -743,7 +743,8 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le ...@@ -743,7 +743,8 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
return decodeOperator(pTaskInfo->pRoot, pInput, len); return 0;
// return decodeOperator(pTaskInfo->pRoot, pInput, len);
} }
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
......
...@@ -1335,7 +1335,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG ...@@ -1335,7 +1335,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
} }
} }
void queryCostStatis(SExecTaskInfo* pTaskInfo) { void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
STaskCostInfo* pSummary = &pTaskInfo->cost; STaskCostInfo* pSummary = &pTaskInfo->cost;
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
...@@ -1958,7 +1958,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { ...@@ -1958,7 +1958,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
destroyDiskbasedBuf(pAggSup->pResultBuf); destroyDiskbasedBuf(pAggSup->pResultBuf);
} }
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey) { const char* pkey) {
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2056,7 +2056,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -2056,7 +2056,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2734,103 +2734,6 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa ...@@ -2734,103 +2734,6 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa
} }
#endif #endif
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
int32_t code = TDB_CODE_SUCCESS;
char* pCurrent = NULL;
int32_t currLength = 0;
if (ops->fpSet.encodeResultRow) {
if (result == NULL || length == NULL || nOptrWithVal == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
if (code != TDB_CODE_SUCCESS) {
if (*result != NULL) {
taosMemoryFree(*result);
*result = NULL;
}
return code;
} else if (currLength == 0) {
ASSERT(!pCurrent);
goto _downstream;
}
++(*nOptrWithVal);
ASSERT(currLength >= 0);
if (*result == NULL) {
*result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
if (*result == NULL) {
taosMemoryFree(pCurrent);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*result + sizeof(int32_t), pCurrent, currLength);
*(int32_t*)(*result) = currLength + sizeof(int32_t);
} else {
int32_t sizePre = *(int32_t*)(*result);
char* tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
if (tmp == NULL) {
taosMemoryFree(pCurrent);
taosMemoryFree(*result);
*result = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
*result = tmp;
memcpy(*result + sizePre, pCurrent, currLength);
*(int32_t*)(*result) += currLength;
}
taosMemoryFree(pCurrent);
*length = *(int32_t*)(*result);
}
_downstream:
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
if (code != TDB_CODE_SUCCESS) {
return code;
}
}
return TDB_CODE_SUCCESS;
}
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
int32_t code = TDB_CODE_SUCCESS;
if (ops->fpSet.decodeResultRow) {
if (result == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
ASSERT(length == *(int32_t*)result);
const char* data = result + sizeof(int32_t);
code = ops->fpSet.decodeResultRow(ops, (char*)data);
if (code != TDB_CODE_SUCCESS) {
return code;
}
int32_t totalLength = *(int32_t*)result;
int32_t dataLength = *(int32_t*)data;
if (totalLength == dataLength + sizeof(int32_t)) { // the last data
result = NULL;
length = 0;
} else {
result += dataLength;
*(int32_t*)(result) = totalLength - dataLength;
length = totalLength - dataLength;
}
}
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = decodeOperator(ops->pDownstream[i], result, length);
if (code != TDB_CODE_SUCCESS) {
return code;
}
}
return TDB_CODE_SUCCESS;
}
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
......
...@@ -456,7 +456,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -456,7 +456,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -102,7 +102,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -102,7 +102,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
} }
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -400,7 +400,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -400,7 +400,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
blockDataEnsureCapacity(pResBlock, numOfRows); blockDataEnsureCapacity(pResBlock, numOfRows);
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -1741,7 +1741,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1741,7 +1741,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2001,7 +2001,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2001,7 +2001,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2069,7 +2069,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2069,7 +2069,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2243,26 +2243,6 @@ static void clearSpecialDataBlock(SSDataBlock* pBlock) { ...@@ -2243,26 +2243,6 @@ static void clearSpecialDataBlock(SSDataBlock* pBlock) {
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
} }
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
// ASSERT(pDest->info.capacity >= pSource->info.rows);
blockDataEnsureCapacity(pDest, pSource->info.rows);
clearSpecialDataBlock(pDest);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);
// copy timestamp column
colDataAssign(pDestCol, pSourceCol, pSource->info.rows, &pDest->info);
for (int32_t i = 1; i < taosArrayGetSize(pDest->pDataBlock); i++) {
SColumnInfoData* pCol = taosArrayGet(pDest->pDataBlock, i);
colDataAppendNNULL(pCol, 0, pSource->info.rows);
}
pDest->info.rows = pSource->info.rows;
pDest->info.groupId = pSource->info.groupId;
pDest->info.type = pSource->info.type;
blockDataUpdateTsWindow(pDest, 0);
}
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) { static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
clearSpecialDataBlock(pBlock); clearSpecialDataBlock(pBlock);
int32_t size = taosArrayGetSize(array); int32_t size = taosArrayGetSize(array);
...@@ -2707,7 +2687,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2707,7 +2687,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4346,7 +4326,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4346,7 +4326,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4651,7 +4631,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -4651,7 +4631,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4847,7 +4827,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4847,7 +4827,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4901,3 +4881,4 @@ _error: ...@@ -4901,3 +4881,4 @@ _error:
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册