提交 9d4b925d 编写于 作者: H Haojun Liao

[td-2895] refactor code.

上级 faea042f
......@@ -951,14 +951,14 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
// todo extract function
int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
void** pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalMerge->resColModel->capacity);
pResPages[i] = calloc(1, pField->bytes * pLocalMerge->resColModel->capacity);
}
while (1) {
int64_t newRows = taosFillResultDataBlock(pFillInfo, (void**)pResPages, pLocalMerge->resColModel->capacity);
int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity);
if (pQueryInfo->limit.offset < newRows) {
newRows -= pQueryInfo->limit.offset;
......@@ -966,7 +966,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
if (pQueryInfo->limit.offset > 0) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset,
memmove(pResPages[i], pResPages[i] + pField->bytes * pQueryInfo->limit.offset,
(size_t)(newRows * pField->bytes));
}
}
......@@ -1010,7 +1010,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows));
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i], (size_t)(pField->bytes * pRes->numOfRows));
offset += pField->bytes;
}
......
......@@ -241,27 +241,12 @@ typedef struct SQuery {
typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef struct SOperatorInfo {
char *name;
bool blockingOptr;
bool completed;
void *optInfo;
SExprInfo *pExpr;
int32_t* rowCellInfoOffset;
int32_t numOfOutput;
__operator_fn_t exec;
__operator_fn_t cleanup;
struct SOperatorInfo *upstream;
} SOperatorInfo;
struct SOperatorInfo;
typedef struct SQueryRuntimeEnv {
jmp_buf env;
SQuery* pQuery;
void* qinfo;
// int32_t numOfRowsPerPage;
// uint16_t* offset;
uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo;
void* pQueryHandle;
......@@ -282,16 +267,31 @@ typedef struct SQueryRuntimeEnv {
char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray;
SOperatorInfo* pi;
struct SOperatorInfo* pi;
SSDataBlock *outputBuf;
int32_t groupIndex;
int32_t tableIndex;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SOperatorInfo *proot;
struct SOperatorInfo *proot;
SGroupResInfo groupResInfo;
} SQueryRuntimeEnv;
typedef struct SOperatorInfo {
char *name;
bool blockingOptr;
bool completed;
void *info;
SExprInfo *pExpr;
int32_t numOfOutput;
SQueryRuntimeEnv *pRuntimeEnv;
__operator_fn_t exec;
__operator_fn_t cleanup;
struct SOperatorInfo *upstream;
} SOperatorInfo;
enum {
QUERY_RESULT_NOT_READY = 1,
QUERY_RESULT_READY = 2,
......@@ -363,23 +363,18 @@ typedef struct STableScanInfo {
} STableScanInfo;
typedef struct STagScanInfo {
SQueryRuntimeEnv *pRuntimeEnv;
SColumnInfo* pCols;
SSDataBlock* pRes;
} STagScanInfo;
typedef struct SAggOperatorInfo {
SResultRowInfo resultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SSDataBlock *pRes;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
......@@ -390,18 +385,14 @@ typedef struct SArithOperatorInfo {
typedef struct SLimitOperatorInfo {
int64_t limit;
int64_t total;
SQueryRuntimeEnv* pRuntimeEnv;
} SLimitOperatorInfo;
typedef struct SOffsetOperatorInfo {
int64_t offset;
int64_t currentOffset;
SQueryRuntimeEnv* pRuntimeEnv;
} SOffsetOperatorInfo;
typedef struct SHashIntervalOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
......@@ -409,13 +400,10 @@ typedef struct SHashIntervalOperatorInfo {
} SHashIntervalOperatorInfo;
typedef struct SFillOperatorInfo {
SQueryRuntimeEnv *pRuntimeEnv;
SSDataBlock *pRes;
} SFillOperatorInfo;
typedef struct SHashGroupbyOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
......
......@@ -110,8 +110,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
//static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2;
}
......@@ -184,15 +182,17 @@ static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOper
static int32_t getNumOfScanTimes(SQuery* pQuery);
static bool isFixedOutputQuery(SQuery* pQuery);
static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createStableAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput);
static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput);
static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv);
static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
......@@ -1126,15 +1126,15 @@ static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, S
}
}
static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
static void aggApplyFunctions(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo);
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = pQuery->window.skey;
pCtx[k].startTs = startTs;// this can be set during create the struct
aAggs[functionId].xFunction(&pCtx[k]);
}
}
......@@ -1168,9 +1168,11 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
}
}
static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SHashIntervalOperatorInfo* pInfo,
int32_t numOfOutput, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
SHashIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
......@@ -1179,63 +1181,68 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows-1] == pSDataBlock->info.window.ekey);
assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0:(pSDataBlock->info.rows-1);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1);
int32_t startPos = pQuery->pos;
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery);
bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN)? true:false;
bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN) ? true : false;
SResultRow *pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
// goto _end;
}
int32_t forwardStep = 0;
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true);
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep =
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) {
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow *pRes = pResultRowInfo->pResult[j];
SResultRow* pRes = pResultRowInfo->pResult[j];
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) &&
resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0, pInfo->pCtx, numOfOutput,
pInfo->rowCellInfoOffset);
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, groupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
// int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1;
// doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p,
// w.ekey, RESULT_ROW_END_INTERP);
// int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1;
// doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
// -1, tsCols[0], p,
// w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows,
numOfOutput);
numOfOutput);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS);
}
// window start key interpolation
//doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, forwardStep);
// doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos,
// forwardStep);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
numOfOutput);
STimeWindow nextWin = win;
while (1) {
......@@ -1246,19 +1253,21 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu
}
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, groupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
forwardStep =
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
// doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
// doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin,
// startPos, forwardStep);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput);
}
}
......@@ -2166,68 +2175,66 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv));
// group by normal column, sliding window query, interval query are handled by interval query processor
// if (!pQuery->stableQuery) { // interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->stableQuery) {
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
} else {
pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
// interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->stableQuery) {
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
} else {
pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
if (pQuery->fillType != TSDB_FILL_NONE) {
pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQuery->fillType != TSDB_FILL_NONE) {
SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput);
}
}
} else if (pQuery->groupbyColumn) {
pRuntimeEnv->proot = createHashGroupbyAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else if (isFixedOutputQuery(pQuery)) {
if (!pQuery->stableQuery) {
pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
} else {
pRuntimeEnv->proot = createStableAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
}
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
if (!onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
}
}
if (!pQuery->stableQuery) { // TODO this problem should be handed at the client side
if (pQuery->limit.offset > 0) {
pRuntimeEnv->proot = createOffsetOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQuery->limit.limit > 0) {
pRuntimeEnv->proot = createLimitOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
}
}
return TSDB_CODE_SUCCESS;
_clean:
// tfree(pRuntimeEnv->rowCellInfoOffset);
tfree(pRuntimeEnv->sasArray);
tfree(pRuntimeEnv->pResultRowHashTable);
tfree(pRuntimeEnv->keyBuf);
......@@ -3229,15 +3236,17 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num
}
void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SExprInfo *pExprInfo = &pQuery->pExpr1[0];
SExprInfo *pExpr = pOperatorInfo->pExpr;
SQuery *pQuery = pRuntimeEnv->pQuery;
SExprInfo* pExprInfo = &pExpr[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQuery->stableQuery) {
assert(pExprInfo->base.numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
......@@ -3254,7 +3263,8 @@ void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr,
}
// todo use tag column index to optimize performance
doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->type, pLocalExprInfo->bytes);
doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->type,
pLocalExprInfo->bytes);
if (IS_NUMERIC_TYPE(pLocalExprInfo->type) || pLocalExprInfo->type == TSDB_DATA_TYPE_BOOL) {
memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->bytes);
......@@ -3266,22 +3276,22 @@ void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr,
}
// set the join tag for first column
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pRuntimeEnv->pTsBuf != NULL &&
pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) &&
pRuntimeEnv->pTsBuf != NULL && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pFuncMsg->numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo *pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
int16_t tagType = pCtx[0].tag.nType;
if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pQInfo,
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo,
pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz);
} else {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pQInfo,
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo,
pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64);
}
}
......@@ -4050,9 +4060,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
#endif
void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
void finalizeQueryResult_rv(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t numOfOutput = pOperator->numOfOutput;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
if (pQuery->groupbyColumn) {
......@@ -4500,7 +4512,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = 0;//pQuery->rec.rows; // there are already exists result rows
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
int32_t start = 0;
int32_t step = -1;
......@@ -4772,6 +4784,7 @@ int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutp
}
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pQuery->rec.capacity);
tfree(p);
return pOutput->info.rows;
}
......@@ -6107,7 +6120,7 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
static SSDataBlock* doTableScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
STableScanInfo *pTableScanInfo = pOperator->optInfo;
STableScanInfo *pTableScanInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
......@@ -6180,7 +6193,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
pOperator->name = "SeqScanTableOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doTableScan;
......@@ -6192,34 +6205,34 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
char* name = pDownstream->name;
if ((strcasecmp(name, "AggregationOp") == 0) || (strcasecmp(name, "STableAggregate") == 0)) {
SAggOperatorInfo* pAggInfo = pDownstream->optInfo;
SAggOperatorInfo* pAggInfo = pDownstream->info;
pTableScanInfo->pCtx = pAggInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->optInfo;
SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashGroupbyAggOp") == 0) {
SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->optInfo;
SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info;
pTableScanInfo->pCtx = pGroupbyInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "STableIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo;
SHashIntervalOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "ArithmeticOp") == 0) {
SArithOperatorInfo *pInfo = pDownstream->optInfo;
SArithOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
......@@ -6246,7 +6259,7 @@ static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQu
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "BidirectionSeqScanTableOp";
pOptr->blockingOptr = false;
pOptr->optInfo = pInfo;
pOptr->info = pInfo;
pOptr->exec = doTableScan;
return pOptr;
......@@ -6261,14 +6274,14 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
}
// this is a blocking operator
static SSDataBlock* doAggregation(void* param) {
static SSDataBlock* doAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SAggOperatorInfo* pAggInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv;
SAggOperatorInfo* pAggInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
......@@ -6282,25 +6295,23 @@ static SSDataBlock* doAggregation(void* param) {
break;
}
setTagVal_rv(pRuntimeEnv, pQuery->current->pTable, pOperator->pExpr, pAggInfo->pCtx, pOperator->numOfOutput);
setTagVal_rv(pOperator, pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput);
// TODO opt perf
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
STableScanInfo* pScanInfo = upstream->optInfo;
STableScanInfo* pScanInfo = upstream->info;
order = getTableScanOrder(pScanInfo);
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order);
aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock);
aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock);
}
pOperator->completed = true;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
if (!pQuery->stableQuery) {
finalizeQueryResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset);
}
finalizeQueryResult_rv(pOperator, pAggInfo->pCtx, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset);
pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput);
destroySQLFunctionCtx(pAggInfo->pCtx, pAggInfo->pRes->info.numOfCols);
......@@ -6308,14 +6319,14 @@ static SSDataBlock* doAggregation(void* param) {
return pAggInfo->pRes;
}
static SSDataBlock* doSTableAggregation(void* param) {
static SSDataBlock* doSTableAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SAggOperatorInfo* pAggInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv;
SAggOperatorInfo* pAggInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes);
......@@ -6330,10 +6341,6 @@ static SSDataBlock* doSTableAggregation(void* param) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
if (pAggInfo->pRes == NULL) {
pAggInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
}
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
......@@ -6343,11 +6350,11 @@ static SSDataBlock* doSTableAggregation(void* param) {
break;
}
setTagVal_rv(pRuntimeEnv, pRuntimeEnv->pQuery->current->pTable, pOperator->pExpr, pAggInfo->pCtx, pOperator->numOfOutput);
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput);
// TODO opt perf
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
STableScanInfo* pScanInfo = upstream->optInfo;
STableScanInfo* pScanInfo = upstream->info;
order = getTableScanOrder(pScanInfo);
}
......@@ -6356,7 +6363,7 @@ static SSDataBlock* doSTableAggregation(void* param) {
TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k);
aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock);
aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock);
}
closeAllResultRows(&pAggInfo->resultRowInfo);
......@@ -6375,8 +6382,8 @@ static SSDataBlock* doSTableAggregation(void* param) {
static SSDataBlock* doArithmeticOperation(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SArithOperatorInfo* pArithInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv;
SArithOperatorInfo* pArithInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput, pArithInfo->rowCellInfoOffset);
......@@ -6390,7 +6397,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
break;
}
setTagVal_rv(pRuntimeEnv, pRuntimeEnv->pQuery->current->pTable, pOperator->pExpr, pArithInfo->pCtx, pOperator->numOfOutput);
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
......@@ -6431,11 +6438,11 @@ static SSDataBlock* doLimit(void* param) {
return NULL;
}
SLimitOperatorInfo* pInfo = pOperator->optInfo;
SLimitOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = true;
return NULL;
}
......@@ -6443,7 +6450,7 @@ static SSDataBlock* doLimit(void* param) {
if (pInfo->total + pBlock->info.rows >= pInfo->limit) {
pBlock->info.rows = (pInfo->limit - pInfo->total);
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = true;
}
......@@ -6453,12 +6460,12 @@ static SSDataBlock* doLimit(void* param) {
static SSDataBlock* doOffset(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo *)param;
SOffsetOperatorInfo *pInfo = pOperator->optInfo;
SOffsetOperatorInfo *pInfo = pOperator->info;
while (1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED);
return NULL;
}
......@@ -6489,9 +6496,9 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
return NULL;
}
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo;
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
......@@ -6517,7 +6524,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
hashIntervalAgg(pRuntimeEnv, &pIntervalInfo->resultRowInfo, pIntervalInfo, pOperator->numOfOutput, pBlock);
hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pIntervalInfo, pBlock, 0);
}
// restore the value
......@@ -6526,7 +6533,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
......@@ -6544,9 +6551,9 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
return NULL;
}
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo;
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
......@@ -6571,11 +6578,13 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
}
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current;
setTagVal_rv(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo, pOperator->numOfOutput,
pBlock);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex);
}
pQuery->order.order = order; // TODO : restore the order
......@@ -6596,9 +6605,9 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
return NULL;
}
SHashGroupbyOperatorInfo *pInfo = pOperator->optInfo;
SHashGroupbyOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
......@@ -6629,7 +6638,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
if (!pRuntimeEnv->pQuery->stableQuery) {
finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
}
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
......@@ -6650,8 +6659,8 @@ static SSDataBlock* doFill(void* param) {
return NULL;
}
SFillOperatorInfo *pInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
SFillOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) {
doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes);
......@@ -6693,91 +6702,83 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
}
if (pOperator->cleanup != NULL) {
pOperator->cleanup(pOperator->optInfo);
pOperator->cleanup(pOperator->info);
}
destroyOperatorInfo(pOperator->upstream);
tfree(pOperator->optInfo);
tfree(pOperator->info);
tfree(pOperator);
}
static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pRuntimeEnv = pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AggregationOp";
pOperator->name = "Aggregate";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->exec = doAggregation;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset);
pOperator->exec = doAggregate;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SOperatorInfo* createStableAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableAggregate";
pOperator->name = "STableAggregate";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->exec = doSTableAggregation;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pOperator->exec = doSTableAggregate;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pOutput = createOutputBuf(pExpr, numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->exec = doArithmeticOperation;
pOperator->pExpr = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->pExpr1:pRuntimeEnv->pQuery->pExpr2;
pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2;
pInfo->pOutput = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQuery->limit.limit;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6786,19 +6787,17 @@ static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo,
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doLimit;
pOperator->pExpr = NULL;
pOperator->numOfOutput = 0;
pOperator->optInfo = pInfo;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo));
pInfo->offset = pRuntimeEnv->pQuery->limit.offset;
pInfo->currentOffset = pInfo->offset;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6807,43 +6806,40 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo,
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doOffset;
pOperator->pExpr = NULL;
pOperator->numOfOutput = 0;
pOperator->optInfo = pInfo;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashIntervalAggOp";
pOperator->name = "HashIntervalAgg";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doHashIntervalAgg;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6855,44 +6851,39 @@ static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQu
pOperator->exec = doSTableIntervalAgg;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->colIndex = -1;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashGroupbyAggOp";
pOperator->name = "HashGroupby";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doHashGroupbyAgg;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6901,19 +6892,19 @@ static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, S
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doFill;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
return pOperator;
}
static SSDataBlock* doTagScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
STagScanInfo *pTagScanInfo = pOperator->optInfo;
SQueryRuntimeEnv *pRuntimeEnv = pTagScanInfo->pRuntimeEnv;
STagScanInfo *pTagScanInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
......@@ -7046,16 +7037,16 @@ static SSDataBlock* doTagScan(void* param) {
static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTagScanOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->info = pInfo;
pOperator->exec = doTagScan;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->exec = doTagScan;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
......
......@@ -425,8 +425,6 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData;
// memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册