提交 9b76c49d 编写于 作者: H Haojun Liao

[td-225] refactor

上级 9b224d9b
......@@ -235,6 +235,7 @@ typedef struct SQuery {
void* tsdb;
SMemRef memRef;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
} SQuery;
typedef SSDataBlock* (*__operator_fn_t)(void* param);
......@@ -242,7 +243,10 @@ typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef struct SOperatorInfo {
char *name;
bool blockingOptr;
bool completed;
void *optInfo;
SExprInfo *pExpr;
int32_t numOfOutput;
__operator_fn_t exec;
struct SOperatorInfo *upstream;
......@@ -284,6 +288,7 @@ typedef struct SQueryRuntimeEnv {
int32_t groupIndex;
int32_t tableIndex;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SOperatorInfo* proot;
} SQueryRuntimeEnv;
typedef struct {
......@@ -301,8 +306,6 @@ typedef struct SQInfo {
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
int32_t vgId;
SQueryRuntimeEnv runtimeEnv;
SQuery query;
......@@ -365,13 +368,27 @@ typedef struct SAggOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx* pCtx;
} SArithOperatorInfo;
typedef struct SLimitOperatorInfo {
int64_t limit;
int64_t total;
SQueryRuntimeEnv* pRuntimeEnv;
} SLimitOperatorInfo;
typedef struct SOffsetOperatorInfo {
int64_t offset;
int64_t currentOffset;
SQueryRuntimeEnv* pRuntimeEnv;
} SOffsetOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
......
......@@ -3564,7 +3564,7 @@ char *getArithColumnData(void *param, const char* name, int32_t colId) {
}
}
assert(index >= 0 && colId >= 0);
assert(index >= 0 /*&& colId >= 0*/);
return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes;
}
......
......@@ -159,14 +159,14 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, SExprInfo* pExprInfo);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColInfo);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery);
static int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo);
static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type);
......@@ -176,19 +176,28 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery);
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static int32_t getNumOfScanTimes(SQuery* pQuery);
static SSDataBlock* createOutputBuf(SQuery* pQuery) {
// setup the output buffer
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId);
static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv);
static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
// setup the output buffer
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) {
SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
res->info.numOfCols = pQuery->numOfOutput;
res->info.numOfCols = numOfOutput;
res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData));
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {0};
idata.info.type = pQuery->pExpr1[i].type;
idata.info.bytes = pQuery->pExpr1[i].bytes;
idata.info.colId = pQuery->pExpr1[i].base.resColId;
idata.pData = calloc(4096, idata.info.bytes);
idata.info.type = pExpr[i].type;
idata.info.bytes = pExpr[i].bytes;
idata.info.colId = pExpr[i].base.resColId;
idata.pData = calloc(4096, idata.info.bytes * 4096);
taosArrayPush(res->pDataBlock, &idata);
}
......@@ -1179,19 +1188,19 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
}
static void setInputSDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) {
SQuery* pQuery = pRuntimeEnv->pQuery;
static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
if (pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pSDataBlock->info.rows;
if (pRuntimeEnv->pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SColIndex *pCol = &pQuery->pExpr1[i].base.colInfo;
SColIndex *pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
SColIndex* pColIndex = &pQuery->pExpr1[i].base.colInfo;
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData *p = taosArrayGet(pSDataBlock->pDataBlock, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[i];
pCtx->pInput = p->pData;
SQLFunctionCtx* pCtx1 = &pCtx[i];
pCtx1->pInput = p->pData;
uint32_t status = aAggs[pCtx->functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
......@@ -1200,17 +1209,18 @@ static void setInputSDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDat
}
}
}
} else {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pSDataBlock->info.rows;
}
}
}
static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < pRuntimeEnv->outputBuf->info.numOfCols; ++k) {
setBlockStatisInfo(&pCtx[k], pSDataBlock, &pQuery->pExpr1[k].base.colInfo);
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo);
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
......@@ -1220,60 +1230,112 @@ static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSData
}
}
static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) {
SArithmeticSupport arithSup = {0};
SQuery *pQuery = pRuntimeEnv->pQuery;
static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) {
sas->numOfCols = (int32_t) pSDataBlock->info.numOfCols;
sas->pArithExpr = pExprInfo;
// create the output result buffer
tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
int32_t bytes = pQuery->pExpr2[i].bytes;
data[i] = (tFilePage *)malloc((size_t)(bytes * pQuery->rec.rows) + sizeof(tFilePage));
sas->colList = calloc(1, pSDataBlock->info.numOfCols*sizeof(SColumnInfo));
for(int32_t i = 0; i < sas->numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, i);
sas->colList[i] = pColData->info;
}
arithSup.numOfCols = (int32_t)pSDataBlock->info.numOfCols;
arithSup.exprList = pQuery->pExpr1;
arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES);
sas->data = calloc(sas->numOfCols, POINTER_BYTES);
// set the input column data
for (int32_t f = 0; f < arithSup.numOfCols; ++f) {
for (int32_t f = 0; f < pSDataBlock->info.numOfCols; ++f) {
SColumnInfoData *pColumnInfoData = taosArrayGet(pSDataBlock->pDataBlock, f);
arithSup.data[f] = pColumnInfoData->pData;
sas->data[f] = pColumnInfoData->pData;
}
}
// output result number of columns
for (int32_t k = 0; k < pRuntimeEnv->outputBuf->info.numOfCols; ++k) {
for (int i = 0; i < pQuery->numOfExpr2; ++i) {
SExprInfo *pExpr = &pQuery->pExpr2[i];
static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SExprInfo *pExprInfo, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// calculate the result from several other columns
SSqlFuncMsg *pSqlFunc = &pExpr->base;
if (pSqlFunc->functionId != TSDB_FUNC_ARITHM) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (pSqlFunc->functionId == pQuery->pExpr1[j].base.functionId &&
pSqlFunc->colInfo.colId == pQuery->pExpr1[j].base.colInfo.colId) {
memcpy(data[i]->data, pQuery->sdata[j]->data, (size_t)(pQuery->pExpr1[j].bytes * pSDataBlock->info.rows));
break;
for (int32_t k = 0; k < numOfOutput; ++k) {
int32_t functionId = pExprInfo[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = pQuery->window.skey;
aAggs[functionId].xFunction(&pCtx[k]);
}
}
} else {
arithSup.pArithExpr = pExpr;
arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup,
TSDB_ORDER_ASC, getArithemicInputSrc);
}
static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SResultRowInfo* pWindowResInfo = &pRuntimeEnv->resultRowInfo;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
int32_t prevIndex = curTimeWindowIndex(pWindowResInfo);
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
SResultRow *pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
// goto _end;
}
int32_t forwardStep = 0;
int32_t startPos = pQuery->pos;
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) {
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow *pRes = pWindowResInfo->pResult[j];
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
memcpy(pQuery->sdata[i]->data, data[i]->data, (size_t)(pQuery->pExpr2[i].bytes * pQuery->rec.rows));
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pDataBlockInfo->rows - 1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p,
w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows, pDataBlock);
}
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
tfree(data[i]);
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS);
}
tfree(data);
tfree(arithSup.data);
// window start key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQuery, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos);
if (startPos < 0) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock);
}
}
......@@ -1440,7 +1502,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
}
setResultOutputBuf(pRuntimeEnv, pResultRow);
initCtxOutputBuf(pRuntimeEnv);
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
return TSDB_CODE_SUCCESS;
}
......@@ -2015,42 +2077,19 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return TSDB_CODE_SUCCESS;
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) {
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t));
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQuery->tagLen);
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t));
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL ||
pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL ||
pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) {
goto _clean;
}
static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfOutput, SExprInfo* pExpr, int32_t order, int32_t vgId) {
SQuery* pQuery = pRuntimeEnv->pQuery;
char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pRuntimeEnv->prevRow;
pRuntimeEnv->prevRow[0] = start;
for(int32_t i = 1; i < pQuery->numOfCols; ++i) {
pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes;
SQLFunctionCtx *pQCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx));
if (pQCtx == NULL) {
return NULL;
}
pRuntimeEnv->offset[0] = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pExpr1[i].base;
for (int32_t i = 0; i < numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pExpr[i].base;
SQLFunctionCtx* pCtx = &pQCtx[i];
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
SColIndex * pIndex = &pSqlFuncMsg->colInfo;
SColIndex *pIndex = &pSqlFuncMsg->colInfo;
if (TSDB_COL_REQ_NULL(pIndex->flag)) {
pCtx->requireNull = true;
......@@ -2085,13 +2124,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
assert(isValidDataType(pCtx->inputType));
pCtx->ptsOutputBuf = NULL;
pCtx->outputBytes = pQuery->pExpr1[i].bytes;
pCtx->outputType = pQuery->pExpr1[i].type;
pCtx->outputBytes = pExpr[i].bytes;
pCtx->outputType = pExpr[i].type;
pCtx->order = pQuery->order.order;
pCtx->order = order;
pCtx->functionId = pSqlFuncMsg->functionId;
pCtx->stableQuery = pQuery->stableQuery;
pCtx->interBufBytes = pQuery->pExpr1[i].interBytes;
pCtx->interBufBytes = pExpr[i].interBytes;
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
......@@ -2114,10 +2153,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
int32_t f = pQuery->pExpr1[0].base.functionId;
int32_t f = pExpr[0].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
pCtx->param[2].i64 = order;
pCtx->param[2].i64 = pQuery->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[3].i64 = functionId;
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
......@@ -2152,12 +2191,55 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
if (i > 0) {
pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes;
pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pQCtx[i - 1].outputBytes;
pRuntimeEnv->rowCellInfoOffset[i] =
pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pExpr1[i - 1].interBytes;
pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes;
}
}
return pQCtx;
_clean:
tfree(pQCtx);
return NULL;
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) {
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pQuery = pQuery;
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t));
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQuery->tagLen);
pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t));
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (pRuntimeEnv->offset == NULL || pRuntimeEnv->rowCellInfoOffset == NULL || pRuntimeEnv->sasArray == NULL ||
pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || pRuntimeEnv->prevRow == NULL ||
pRuntimeEnv->tagVal == NULL) {
goto _clean;
}
char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pRuntimeEnv->prevRow;
pRuntimeEnv->prevRow[0] = start;
for(int32_t i = 1; i < pQuery->numOfCols; ++i) {
pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes;
}
pRuntimeEnv->offset[0] = 0;
pRuntimeEnv->pCtx = createSQLFunctionCtx(pRuntimeEnv, pQuery->numOfOutput, pQuery->pExpr1, order, vgId);
if (pRuntimeEnv->pCtx == NULL) {
goto _clean;
}
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
// if it is group by normal column, do not set output buffer, the output buffer is pResult
......@@ -2171,6 +2253,28 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv));
// group by normal column, sliding window query, interval query are handled by interval query processor
if (!pQuery->stableQuery) { // interval (down sampling operation)
if (isFixedOutputQuery(pRuntimeEnv)) {
pRuntimeEnv->proot = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
}
if (pQuery->limit.offset > 0) {
pRuntimeEnv->proot = createOffsetOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQuery->limit.limit > 0) {
pRuntimeEnv->proot = createLimitOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
}
return TSDB_CODE_SUCCESS;
_clean:
......@@ -3429,19 +3533,17 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pQuery->rec.capacity));
}
initCtxOutputBuf(pRuntimeEnv);
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
}
void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx, SSDataBlock* pDataBlock) {
int32_t tid = 0;
int64_t uid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
SColumnInfoData* pData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, i);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SQLFunctionCtx *pCtx = &pSQLCtx[i];
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
/*
* set the output buffer information and intermediate buffer
......@@ -3456,11 +3558,11 @@ void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv) {
// set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput;
pCtx->ptsOutputBuf = pSQLCtx[0].pOutput;
}
}
initCtxOutputBuf(pRuntimeEnv);
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
}
void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
......@@ -3491,19 +3593,19 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
}
}
void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pRuntimeEnv->pCtx[j].functionId;
pRuntimeEnv->pCtx[j].currentStage = 0;
int32_t functionId = pSQLCtx[j].functionId;
pSQLCtx[j].currentStage = 0;
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pSQLCtx[j]);
if (pResInfo->initialized) {
continue;
}
aAggs[functionId].init(&pRuntimeEnv->pCtx[j]);
aAggs[functionId].init(&pSQLCtx[j]);
}
}
......@@ -3985,7 +4087,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
// record the current active group id
pRuntimeEnv->prevGroupId = groupIndex;
setResultOutputBuf(pRuntimeEnv, pResultRow);
initCtxOutputBuf(pRuntimeEnv);
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
}
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
......@@ -4039,8 +4141,9 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
}
}
int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery* pQuery = pRuntimeEnv->pQuery;
assert(pRuntimeEnv->pTsBuf != NULL);
// both the master and supplement scan needs to set the correct ts comp start position
......@@ -4049,14 +4152,14 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf
if (pTableQueryInfo->cur.vgroupIndex == -1) {
tVariantAssign(&pTableQueryInfo->tag, pTag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQInfo->vgId, &pTableQueryInfo->tag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, &pTableQueryInfo->tag);
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
qError("QInfo:%p failed to find tag:%s in ts_comp", pRuntimeEnv->qinfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pQInfo, pTag->i64);
qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pRuntimeEnv->qinfo, pTag->i64);
}
return false;
......@@ -4065,18 +4168,18 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf
// keep the cursor info of current meter
pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
} else {
tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
}
......@@ -4362,9 +4465,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->pExpr2 == NULL) {
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
if (pQuery->pExpr2 == NULL) {
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows);
......@@ -4372,10 +4475,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
}
} else {
for (int32_t col = 0; col < pQuery->numOfExpr2; ++col) {
int32_t bytes = pQuery->pExpr2[col].bytes;
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
data += bytes * numOfRows;
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows);
data += pColRes->info.bytes * numOfRows;
}
}
......@@ -4882,7 +4984,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
}
pQuery->tsdb = tsdb;
pQInfo->vgId = vgId;
pQuery->vgId = vgId;
pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0;
pRuntimeEnv->pQuery = pQuery;
......@@ -4952,7 +5054,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
// create runtime environment
int32_t numOfTables = pQuery->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pQuery->order.order, pQInfo->vgId);
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pQuery->order.order, pQuery->vgId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4996,7 +5098,7 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa
}
if (pRuntimeEnv->pTsBuf != NULL) {
setTimestampListJoinInfo(pQInfo, pTableQueryInfo);
setTimestampListJoinInfo(pRuntimeEnv, pTableQueryInfo);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
......@@ -5145,7 +5247,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
tVariant* pTag = &pRuntimeEnv->pCtx[0].tag;
if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQInfo->vgId, pTag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
......@@ -5170,7 +5272,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
if (tVariantCompare(elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQInfo->vgId, pTag);
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem1)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
......@@ -5201,7 +5303,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
}
}
initCtxOutputBuf(pRuntimeEnv);
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
return true;
}
......@@ -5286,7 +5388,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
longjmp(pRuntimeEnv->env, terrno);
}
initCtxOutputBuf(pRuntimeEnv);
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
assert(taosArrayGetSize(s) >= 1);
......@@ -5873,7 +5975,9 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
}
static SSDataBlock* doTableScan(void* param) {
STableScanInfo * pTableScanInfo = (STableScanInfo *)param;
SOperatorInfo* pOperator = (SOperatorInfo*) param;
STableScanInfo *pTableScanInfo = pOperator->optInfo;
SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
......@@ -5947,6 +6051,8 @@ static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle
pOptr->name = "SeqScanTableOp";
pOptr->blockingOptr = false;
pOptr->optInfo = pInfo;
pOptr->completed = false;
pOptr->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOptr->exec = doTableScan;
return pOptr;
......@@ -5985,26 +6091,33 @@ static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
// this is a blocking operator
static SSDataBlock* doAggregation(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SAggOperatorInfo* pAggInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SOperatorInfo* upstream = pOperator->upstream;
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv);
pRuntimeEnv->pQuery->pos = 0;
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf);
pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream->optInfo);
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pRuntimeEnv, pBlock);
aggApplyFunctions(pRuntimeEnv, pBlock);
setInputSDataBlock(pOperator, pCtx, pBlock);
aggApplyFunctions(pRuntimeEnv, pOperator, pCtx, pBlock);
}
pOperator->completed = true;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
......@@ -6018,22 +6131,144 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SArithOperatorInfo* pArithInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv;
SOperatorInfo* upstream = pOperator->upstream;
SQuery* pQuery = pRuntimeEnv->pQuery;
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv);
if (pArithInfo->pCtx == NULL) {
pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
}
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pArithInfo->pCtx, pRes);
SOperatorInfo* upstream = pOperator->upstream;
pRuntimeEnv->pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream->optInfo);
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pRuntimeEnv, pBlock);
aggApplyFunctions(pRuntimeEnv, pBlock);
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pArithInfo->pCtx[i].size = pBlock->info.rows;
if (pArithInfo->pCtx[i].functionId == TSDB_FUNC_ARITHM) {
setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, pOperator->pExpr, pBlock);
} else {
SColIndex *pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData *p = taosArrayGet(pBlock->pDataBlock, j);
if (p->info.colId == pCol->colId) {
pArithInfo->pCtx[i].pInput = p->pData;
break;
}
}
}
}
}
arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->pCtx, pOperator->pExpr, pOperator->numOfOutput);
pRes->info.rows += pBlock->info.rows;
if (pRes->info.rows > 4096) {
break;
}
}
pRuntimeEnv->outputBuf = pRes;
return pRes;
}
static SSDataBlock* doLimit(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SLimitOperatorInfo* pInfo = pOperator->optInfo;
SOperatorInfo* upstream = pOperator->upstream;
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = true;
return NULL;
}
if (pInfo->total + pBlock->info.rows >= pInfo->limit) {
pBlock->info.rows = (pInfo->limit - pInfo->total);
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = true;
}
return pBlock;
}
static SSDataBlock* doOffset(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo *)param;
SOffsetOperatorInfo *pInfo = pOperator->optInfo;
SOperatorInfo* upstream = pOperator->upstream;
while (1) {
SSDataBlock *pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
return NULL;
}
if (pInfo->currentOffset == 0) {
return pBlock;
} else if (pInfo->currentOffset > pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = pBlock->info.rows - pInfo->currentOffset;
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
pInfo->currentOffset = 0;
return pBlock;
}
}
}
static SSDataBlock* doHashIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SAggOperatorInfo* pAggInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SOperatorInfo* upstream = pOperator->upstream;
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf);
pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pCtx, pBlock);
hashIntervalAgg(pRuntimeEnv, pOperator, pCtx, pBlock);
}
pOperator->completed = true;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
......@@ -6053,7 +6288,7 @@ static int32_t getNumOfScanTimes(SQuery* pQuery) {
return 1;
}
static UNUSED_FUNC SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......@@ -6064,15 +6299,17 @@ static UNUSED_FUNC SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultR
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AggregationOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->upstream = inputOptr;
pOperator->exec = doAggregation;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
return pOperator;
}
static UNUSED_FUNC SOperatorInfo* createArithOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
pInfo->pTableQueryInfo = pTableQueryInfo;
......@@ -6081,14 +6318,57 @@ static UNUSED_FUNC SOperatorInfo* createArithOperatorInfo(SResultRowInfo* pResul
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->upstream = inputOptr;
pOperator->exec = doArithmeticOperation;
pOperator->pExpr = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->pExpr1:pRuntimeEnv->pQuery->pExpr2;
pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2;
return pOperator;
}
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQuery->limit.limit;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "LimitOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->exec = doLimit;
pOperator->pExpr = NULL;
pOperator->numOfOutput = 0;
pOperator->optInfo = pInfo;
return pOperator;
}
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo));
pInfo->offset = pRuntimeEnv->pQuery->limit.offset;
pInfo->currentOffset = pInfo->offset;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "OffsetOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->exec = doOffset;
pOperator->pExpr = NULL;
pOperator->numOfOutput = 0;
pOperator->optInfo = pInfo;
return pOperator;
}
static
/*
* in each query, this function will be called only once, no retry for further result.
......@@ -6096,7 +6376,7 @@ static
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -6104,11 +6384,8 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
return;
}
SOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
SSDataBlock* pResBlock = pAggInfo->exec(pAggInfo->optInfo);
SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pResBlock->info.rows;
// doSecondaryArithmeticProcess(pQuery);
// TODO limit/offset refactor to be one operator
// skipResults(pRuntimeEnv);
......@@ -6125,12 +6402,16 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
}
// skip blocks without load the actual data block from file if no filter condition present
skipBlocks(&pQInfo->runtimeEnv);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
}
// skipBlocks(&pQInfo->runtimeEnv);
// if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = (pResBlock != NULL)? pResBlock->info.rows : 0;
#if 0
while (1) {
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
finalizeQueryResult(pRuntimeEnv);
......@@ -6166,6 +6447,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
if (!isTsCompQuery(pQuery)) {
assert(pQuery->rec.rows <= pQuery->rec.capacity);
}
#endif
}
static void copyAndFillResult(SQInfo* pQInfo) {
......@@ -7125,7 +7407,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
}
doUpdateExprColumnIndex(pQuery);
pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery);
pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery->pExpr1, pQuery->numOfOutput);
int32_t ret = createFilterInfo(pQInfo, pQuery);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -7622,8 +7904,8 @@ void buildTagQueryResult(SQInfo* pQInfo) {
*(int32_t *)output = id->tid;
output += sizeof(id->tid);
*(int32_t *)output = pQInfo->vgId;
output += sizeof(pQInfo->vgId);
*(int32_t *)output = pQuery->vgId;
output += sizeof(pQuery->vgId);
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
char* data = tsdbGetTableName(item->pTable);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册