提交 9ea9444b 编写于 作者: H Haojun Liao

[td-11818] code refactor.

上级 751512f4
......@@ -84,7 +84,7 @@ typedef struct STscObj {
uint64_t id; // ref ID returned by taosAddRef
void *pTransporter;
pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj from this tscObj
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo *pAppInfo;
} STscObj;
......@@ -109,12 +109,13 @@ typedef struct SShowReqInfo {
} SShowReqInfo;
typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now
void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
SDataBuf requestMsg;
SReqResultInfo resInfo;
tsem_t rspSem; // not used now
void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg;
struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
struct SQueryDag *pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
......
......@@ -199,6 +199,7 @@ static void doDestroyRequest(void* p) {
tfree(pRequest->pInfo);
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryDag(pRequest->body.pDag);
deregisterRequest(pRequest);
tfree(pRequest);
......
......@@ -331,7 +331,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj *pRequest = NULL;
SQueryNode *pQuery = NULL;
SQueryDag *pDag = NULL;
terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
......@@ -340,14 +339,13 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if (qIsDdlQuery(pQuery)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return);
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return);
pRequest->code = terrno;
}
_return:
qDestroyQuery(pQuery);
qDestroyQueryDag(pDag);
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
pRequest->code = terrno;
}
......
......@@ -542,34 +542,34 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperator(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
//SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
//SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -294,7 +294,6 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
res->info.numOfCols = numOfOutput;
res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {{0}};
......@@ -1815,7 +1814,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
return TSDB_CODE_SUCCESS;
}
static SQLFunctionCtx* createSQLFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
static SQLFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t** rowCellInfoOffset) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -1919,6 +1918,104 @@ static SQLFunctionCtx* createSQLFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI
return pFuncCtx;
}
static SQLFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExpr, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx));
if (pFuncCtx == NULL) {
return NULL;
}
*rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t));
if (*rowCellInfoOffset == 0) {
tfree(pFuncCtx);
return NULL;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SSqlExpr *pSqlExpr = &pExpr[i].base;
SQLFunctionCtx* pCtx = &pFuncCtx[i];
#if 0
SColIndex *pIndex = &pSqlExpr->colInfo;
if (TSDB_COL_REQ_NULL(pIndex->flag)) {
pCtx->requireNull = true;
pIndex->flag &= ~(TSDB_COL_NULL);
} else {
pCtx->requireNull = false;
}
#endif
// pCtx->inputBytes = pSqlExpr->colBytes;
// pCtx->inputType = pSqlExpr->colType;
pCtx->ptsOutputBuf = NULL;
pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes;
pCtx->resDataInfo.type = pSqlExpr->resSchema.type;
// pCtx->order = pQueryAttr->order.order;
// pCtx->functionId = pSqlExpr->functionId;
// pCtx->stableQuery = pQueryAttr->stableQuery;
pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes;
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pSqlExpr->param[j].nType;
int16_t bytes = pSqlExpr->param[j].nLen;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
taosVariantCreateFromBinary(&pCtx->param[j], pSqlExpr->param[j].pz, bytes, type);
} else {
taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlExpr->param[j].i, bytes, type);
}
}
// set the order information for top/bottom query
int32_t functionId = pCtx->functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
int32_t f = getExprFunctionId(&pExpr[0]);
assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
// pCtx->param[2].i = pQueryAttr->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[3].i = functionId;
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
// pCtx->param[1].i = pQueryAttr->order.col.info.colId;
} else if (functionId == FUNCTION_INTERP) {
// pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
// if (pQueryAttr->fillVal != NULL) {
// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType);
// }
// }
// }
} else if (functionId == FUNCTION_TS_COMP) {
// pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_TWA) {
// pCtx->param[1].i = pQueryAttr->window.skey;
pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
// pCtx->param[2].i = pQueryAttr->window.ekey;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_ARITHM) {
// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
}
}
// for(int32_t i = 1; i < numOfOutput; ++i) {
// (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr[i - 1].base.interBytes);
// }
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
return pFuncCtx;
}
static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
if (pCtx == NULL) {
return NULL;
......@@ -4452,7 +4549,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// return true;
//}
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
if (p->pDownstream == NULL) {
assert(p->numOfDownstream == 0);
}
......@@ -4545,28 +4642,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
pRuntimeEnv->cur.vgroupIndex = -1;
setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo);
switch(tbScanner) {
// case OP_TableBlockInfoScan: {
// pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
// break;
// }
// case OP_TableSeqScan: {
// pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
// break;
// }
// case OP_DataBlocksOptScan: {
// pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
// break;
// }
// case OP_TableScan: {
// pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
// break;
// }
default: { // do nothing
break;
}
}
if (sourceOptr != NULL) {
assert(pRuntimeEnv->proot == NULL);
pRuntimeEnv->proot = sourceOptr;
......@@ -4841,7 +4916,7 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
}
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTableScanOperator(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0 && numOfOutput > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -4854,7 +4929,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, in
return NULL;
}
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = 0;
pInfo->order = order;
......@@ -4874,7 +4949,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, in
return pOperator;
}
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -4887,7 +4962,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, int32_t order
return NULL;
}
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
......@@ -4907,10 +4982,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, int32_t order
return pOperator;
}
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = 1;
pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
......@@ -4931,10 +5006,10 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEn
return pOperator;
}
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
SColumnInfoData infoData = {{0}};
......@@ -5118,7 +5193,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pInfo->bufCapacity = 4096;
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
......@@ -5167,7 +5242,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
// pOperator->exec = doGlobalAggregate;
pOperator->cleanup = destroyGlobalAggOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -5316,7 +5391,7 @@ SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->cleanup = destroyOrderOperatorInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6248,33 +6323,33 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
tfree(pOperator);
}
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = 1;//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MAIN_SCAN);
// setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MAIN_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate";
// pOperator->operatorType = OP_Aggregate;
pOperator->operatorType = OP_Aggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->pRuntimeEnv = NULL;
pOperator->exec = doAggregate;
pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6354,7 +6429,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOp
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6369,7 +6444,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOp
pOperator->exec = doSTableAggregate;
pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6382,7 +6457,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
pBInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MAIN_SCAN);
......@@ -6399,7 +6474,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->exec = doProjectOperation;
pOperator->cleanup = destroyProjectOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6457,7 +6532,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroyConditionOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6475,7 +6550,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->exec = doLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6483,7 +6558,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6500,7 +6575,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe
pOperator->exec = doIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6508,7 +6583,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6525,7 +6600,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->exec = doAllIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6533,7 +6608,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1;
pInfo->reptScan = false;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6549,13 +6624,13 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pOperator->exec = doStateWindowAgg;
pOperator->cleanup = destroyStateWindowOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6574,14 +6649,14 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroySWindowOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6598,14 +6673,14 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator->exec = doSTableIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6622,7 +6697,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->exec = doAllSTableIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6633,7 +6708,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -6655,7 +6730,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->exec = hashGroupbyAggregate;
pOperator->cleanup = destroyGroupbyOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6694,7 +6769,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
pOperator->exec = doFill;
pOperator->cleanup = destroySFillOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -6742,7 +6817,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroySlimitOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......@@ -7040,7 +7115,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
pOperator->pExpr = pExpr;
pOperator->cleanup = destroyDistinctOperatorInfo;
appendUpstream(pOperator, downstream);
appendDownstream(pOperator, downstream);
return pOperator;
}
......
......@@ -2,7 +2,6 @@
#include "tbinoperator.h"
#include "tunaryoperator.h"
static void assignBasicParaInfo(struct SScalarFuncParam* dst, const struct SScalarFuncParam* src) {
dst->type = src->type;
dst->bytes = src->bytes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册