提交 2b4edcf6 编写于 作者: H Haojun Liao

[td-13039] support fill.

上级 311e65da
...@@ -462,7 +462,8 @@ typedef struct { ...@@ -462,7 +462,8 @@ typedef struct {
int32_t tz; // query client timezone int32_t tz; // query client timezone
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char offsetUnit; char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision;
int64_t interval; int64_t interval;
int64_t sliding; int64_t sliding;
int64_t offset; int64_t offset;
......
...@@ -76,11 +76,12 @@ typedef struct SResultRowCell { ...@@ -76,11 +76,12 @@ typedef struct SResultRowCell {
* If the number of generated results is greater than this value, * If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate. * query query will be halt and return results to client immediate.
*/ */
typedef struct SRspResultInfo { typedef struct SResultInfo { // TODO refactor
int64_t total; // total generated result size in rows int64_t totalRows; // total generated result size in rows
int64_t totalBytes; // total results in bytes.
int32_t capacity; // capacity of current result output buffer int32_t capacity; // capacity of current result output buffer
int32_t threshold; // result size threshold in rows. int32_t threshold; // result size threshold in rows.
} SRspResultInfo; } SResultInfo;
typedef struct SColumnFilterElem { typedef struct SColumnFilterElem {
int16_t bytes; // column length int16_t bytes; // column length
...@@ -160,8 +161,8 @@ typedef struct STaskCostInfo { ...@@ -160,8 +161,8 @@ typedef struct STaskCostInfo {
typedef struct SOperatorCostInfo { typedef struct SOperatorCostInfo {
uint64_t openCost; uint64_t openCost;
uint64_t execCost; uint64_t execCost;
uint64_t totalRows; // uint64_t totalRows;
uint64_t totalBytes; // uint64_t totalBytes;
} SOperatorCostInfo; } SOperatorCostInfo;
typedef struct { typedef struct {
...@@ -301,7 +302,7 @@ typedef struct STaskRuntimeEnv { ...@@ -301,7 +302,7 @@ typedef struct STaskRuntimeEnv {
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
STableQueryInfo* current; STableQueryInfo* current;
SRspResultInfo resultInfo; SResultInfo resultInfo;
SHashObj* pTableRetrieveTsMap; SHashObj* pTableRetrieveTsMap;
struct SUdfInfo* pUdfInfo; struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv; } STaskRuntimeEnv;
...@@ -324,7 +325,7 @@ typedef struct SOperatorInfo { ...@@ -324,7 +325,7 @@ typedef struct SOperatorInfo {
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost; SOperatorCostInfo cost;
SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_fn_t getNextFn; __optr_fn_t getNextFn;
...@@ -539,6 +540,8 @@ typedef struct SFillOperatorInfo { ...@@ -539,6 +540,8 @@ typedef struct SFillOperatorInfo {
void** p; void** p;
SSDataBlock* existNewGroupBlock; SSDataBlock* existNewGroupBlock;
bool multigroupResult; bool multigroupResult;
SInterval intervalInfo;
int32_t capacity;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupKeys { typedef struct SGroupKeys {
...@@ -649,21 +652,19 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -649,21 +652,19 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo); SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
......
...@@ -248,7 +248,7 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCo ...@@ -248,7 +248,7 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCo
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
...@@ -7113,22 +7113,23 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRunti ...@@ -7113,22 +7113,23 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRunti
static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
SFillOperatorInfo *pInfo = pOperator->info; SFillOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
SResultInfo* pResultInfo = &pOperator->resultInfo;
blockDataCleanup(pInfo->pRes);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; // doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); // if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { // return pInfo->pRes;
return pInfo->pRes; // }
}
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) { if (*newgroup) {
assert(pBlock != NULL); assert(pBlock != NULL);
...@@ -7140,7 +7141,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -7140,7 +7141,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
// Fill the previous group data block, before handle the data block of new group. // Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block // Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); // taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
if (pBlock == NULL) { if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) { if (pInfo->totalInputRows == 0) {
...@@ -7148,7 +7149,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -7148,7 +7149,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
return NULL; return NULL;
} }
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); // taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
pInfo->totalInputRows += pBlock->info.rows; pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
...@@ -7156,25 +7157,25 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -7156,25 +7157,25 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
} }
} }
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pInfo->capacity, pInfo->p);
// current group has no more result to return // current group has no more result to return
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
// 1. The result in current group not reach the threshold of output result, continue // 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) { if (pInfo->pRes->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
return pInfo->pRes; return pInfo->pRes;
} }
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); // doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
return pInfo->pRes; return pInfo->pRes;
} }
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL); assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); // doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { if (pInfo->pRes->info.rows > pResultInfo->threshold) {
return pInfo->pRes; return pInfo->pRes;
} }
} else { } else {
...@@ -7537,8 +7538,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 ...@@ -7537,8 +7538,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
return 0; return 0;
} }
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
ASSERT(numOfDownstream == 1);
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -7622,7 +7622,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S ...@@ -7622,7 +7622,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -7647,7 +7647,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper ...@@ -7647,7 +7647,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pInfo->colIndex = -1; pInfo->colIndex = -1;
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -7712,7 +7712,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim ...@@ -7712,7 +7712,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -7736,7 +7736,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun ...@@ -7736,7 +7736,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -7827,39 +7827,44 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -7827,39 +7827,44 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return NULL; return NULL;
} }
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
pInfo->intervalInfo = *pInterval;
SResultInfo* pResultInfo = &pOperator->resultInfo;
{ {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pQueryAttr->fillVal);
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal);
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY sk = TMIN(pTaskInfo->window.skey, pTaskInfo->window.ekey);
TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY ek = TMAX(pTaskInfo->window.skey, pTaskInfo->window.ekey);
// getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w); getAlignQueryTimeWindow(pInterval, pInterval->precision, pTaskInfo->window.skey, sk, ek, &w);
pInfo->pFillInfo = int32_t order = TSDB_ORDER_ASC;
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
pInfo->p = calloc(numOfOutput, POINTER_BYTES); // pInfo->pFillInfo =
} // taosCreateFillInfo(order, w.skey, 0, (int32_t)pResultInfo->capacity, numOfCols,
// pInterval->sliding, pInterval->slidingUnit,
// (int8_t)pInterval->precision, pQueryAttr->fillType, pColInfo, pTaskInfo->id.str);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pInfo->p = calloc(numOfCols, POINTER_BYTES);
}
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill; // pOperator->operatorType = OP_Fill;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doFill; pOperator->getNextFn = doFill;
pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroySFillOperatorInfo; pOperator->closeFn = destroySFillOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
...@@ -7868,11 +7873,12 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf ...@@ -7868,11 +7873,12 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
// pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); // pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
// pInfo->slimit = pQueryAttr->slimit; // pInfo->slimit = pQueryAttr->slimit;
// pInfo->limit = pQueryAttr->limit; // pInfo->limit = pQueryAttr->limit;
// pInfo->capacity = pRuntimeEnv->resultInfo.capacity; // pInfo->capacity = pResultInfo->capacity;
// pInfo->threshold = (int64_t)(pInfo->capacity * 0.8); // pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
// pInfo->currentOffset = pQueryAttr->limit.offset; // pInfo->currentOffset = pQueryAttr->limit.offset;
// pInfo->currentGroupOffset = pQueryAttr->slimit.offset; // pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
...@@ -7895,9 +7901,8 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -7895,9 +7901,8 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
offset += pExpr[index->colIndex].base.resSchema.bytes; offset += pExpr[index->colIndex].base.resSchema.bytes;
} }
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator"; pOperator->name = "SLimitOperator";
// pOperator->operatorType = OP_SLimit; // pOperator->operatorType = OP_SLimit;
...@@ -7920,7 +7925,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -7920,7 +7925,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) {
} }
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info; STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes; SSDataBlock *pRes = pInfo->pRes;
...@@ -8046,7 +8051,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -8046,7 +8051,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) {
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1); assert(numOfGroup == 0 || numOfGroup == 1);
...@@ -8390,10 +8395,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa ...@@ -8390,10 +8395,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
...@@ -8847,7 +8849,7 @@ static void doUpdateExprColumnIndex(STaskAttr *pQueryAttr) { ...@@ -8847,7 +8849,7 @@ static void doUpdateExprColumnIndex(STaskAttr *pQueryAttr) {
} }
} }
void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
// the minimum number of rows for projection query // the minimum number of rows for projection query
...@@ -8868,7 +8870,7 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { ...@@ -8868,7 +8870,7 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) {
} }
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO); pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO);
pResultInfo->total = 0; pResultInfo->totalRows = 0;
} }
//TODO refactor //TODO refactor
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册