提交 fa3fce9c 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 cde3e970
...@@ -46,9 +46,9 @@ typedef void **TAOS_ROW; ...@@ -46,9 +46,9 @@ typedef void **TAOS_ROW;
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes #define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes #define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 15 // string #define TSDB_DATA_TYPE_JSON 15 // json string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary #define TSDB_DATA_TYPE_VARCHAR 16 // string
#define TSDB_DATA_TYPE_JSON 17 // json #define TSDB_DATA_TYPE_VARBINARY 17 // binary
#define TSDB_DATA_TYPE_DECIMAL 18 // decimal #define TSDB_DATA_TYPE_DECIMAL 18 // decimal
#define TSDB_DATA_TYPE_BLOB 19 // binary #define TSDB_DATA_TYPE_BLOB 19 // binary
......
...@@ -264,7 +264,8 @@ typedef struct SExecTaskInfo { ...@@ -264,7 +264,8 @@ typedef struct SExecTaskInfo {
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string char* sql; // query sql string
jmp_buf env; // jmp_buf env; // when error occurs, abort
int32_t bufSize; // available buffer size for all operator
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
...@@ -322,6 +323,7 @@ typedef struct SOperatorInfo { ...@@ -322,6 +323,7 @@ typedef struct SOperatorInfo {
SExprInfo* pExpr; SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
...@@ -620,6 +622,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim ...@@ -620,6 +622,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
...@@ -655,10 +659,6 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -655,10 +659,6 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doSLimit(void* param, bool* newgroup);
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
......
...@@ -208,6 +208,9 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); ...@@ -208,6 +208,9 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void doSetOperatorCompleted(SOperatorInfo* pOperator) { static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
...@@ -217,6 +220,10 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { ...@@ -217,6 +220,10 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
} }
} }
static void dummyOperatorOpenFn() {
return;
}
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
...@@ -5236,43 +5243,53 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { ...@@ -5236,43 +5243,53 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); static SSDataBlock* createResultDataBlock(const SArray* pExprInfo);
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (ret == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo); goto _error;
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
} }
size_t numOfSources = taosArrayGetSize(pSources);
pInfo->pSources = taosArrayDup(pSources); pInfo->pSources = taosArrayDup(pSources);
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSources == NULL) {
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { goto _error;
tfree(pInfo);
tfree(pOperator);
taosArrayDestroy(pInfo->pSources);
taosArrayDestroy(pInfo->pSourceDataInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
} }
for(int32_t i = 0; i < numOfSources; ++i) { size_t numOfSources = taosArrayGetSize(pSources);
SSourceDataInfo dataInfo = {0}; int32_t code = initDataSource(numOfSources, pInfo);
dataInfo.status = DATA_NOT_READY; if (code != TSDB_CODE_SUCCESS) {
dataInfo.pEx = pInfo; goto _error;
dataInfo.index = i;
taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
} }
size_t size = taosArrayGetSize(pExprInfo);
pInfo->pResult = createResultDataBlock(pExprInfo); pInfo->pResult = createResultDataBlock(pExprInfo);
pInfo->seqLoadData = true; if (pInfo->pResult == NULL) {
goto _error;
}
pInfo->seqLoadData = true; // sequentially load data from the source node
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator"; pOperator->name = "ExchangeOperator";
...@@ -5280,9 +5297,11 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5280,9 +5297,11 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = size; pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->openFn = NULL; // assign a dummy function.
pOperator->nextDataFn = doLoadRemoteData;
pOperator->closeFn = destroyExchangeOperatorInfo;
#if 1 #if 1
{ // todo refactor { // todo refactor
...@@ -5308,6 +5327,16 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5308,6 +5327,16 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
#endif #endif
return pOperator; return pOperator;
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, 0);
}
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
} }
SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
...@@ -7115,17 +7144,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7115,17 +7144,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevData); tfree(pInfo->prevData);
} }
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param; SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param; STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
...@@ -7145,6 +7174,17 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7145,6 +7174,17 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*) param;
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {
blockDataDestroy(pExInfo->pResult);
}
tsem_destroy(&pExInfo->ready);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册