提交 95341223 编写于 作者: X Xiaoyu Wang

Merge branch '3.0_query_integrate' of github.com:taosdata/TDengine into 3.0_query_integrate

......@@ -173,10 +173,8 @@ typedef struct SColumn {
int64_t dataBlockId;
};
union {
int16_t colId;
int16_t slotId;
};
int16_t colId;
int16_t slotId;
char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
......
......@@ -543,9 +543,10 @@ typedef struct SFillOperatorInfo {
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
int32_t colIndex;
SArray* pGroupCols;
char* prevData; // previous group by value
SGroupResInfo groupResInfo;
SAggSupporter aggSup;
} SGroupbyOperatorInfo;
typedef struct SSessionAggOperatorInfo {
......@@ -643,6 +644,8 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfD
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
......@@ -650,7 +653,6 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
......
......@@ -16,6 +16,7 @@
#include <filter.h>
#include <function.h>
#include <functionMgt.h>
#include <libs/function/function.h>
#include <querynodes.h>
#include <tname.h>
#include "os.h"
......@@ -242,8 +243,8 @@ static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupId,
SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
......@@ -1012,8 +1013,8 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of
pCtx[k].startTs = pWin->skey;
// keep it temporarialy
int32_t startOffset = pCtx[k].input.startRowIndex;
bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t startOffset = pCtx[k].input.startRowIndex;
int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1);
......@@ -1622,35 +1623,30 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pBlock) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
// TODO multiple group by columns
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
return;
}
SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0);
int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL;
STimeWindow w = TSWINDOW_INITIALIZER;
int32_t num = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (colDataIsNull(pColInfoData, pBlock->info.rows, j, NULL)) { // TODO
continue;
}
char* val = colDataGetData(pColInfoData, j);
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
......@@ -1672,35 +1668,28 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
}
}
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes);
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0,
pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
doApplyFunctions(pInfo->binfo.pCtx, &w, j - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
num = 1;
memcpy(pInfo->prevData, val, bytes);
}
if (num > 0) {
char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num);
char* val = ((char*)pColInfoData->pData) + bytes * (pBlock->info.rows - num);
memcpy(pInfo->prevData, val, bytes);
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0,
pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
doApplyFunctions(pInfo->binfo.pCtx, &w, pBlock->info.rows - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
}
tfree(pInfo->prevData);
......@@ -1792,9 +1781,8 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
}
}
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedBuf *pResultBuf = pRuntimeEnv->pResultBuf;
static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupId,
SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) {
int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx *pCtx = binfo->pCtx;
......@@ -1808,19 +1796,12 @@ static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicI
}
int64_t tid = 0;
SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex);
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char *)pData, TSDB_KEYSIZE, true, groupId, pTaskInfo, true, pAggSup);
assert (pResultRow != NULL);
setResultRowKey(pResultRow, pData, type);
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQueryAttr->resultRowSize);
if (ret != 0) {
return -1;
}
}
setResultOutputBuf(pRuntimeEnv, pResultRow, pCtx, numOfCols, rowCellInfoOffset);
initCtxOutputBuffer(pCtx, numOfCols);
setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
......@@ -1851,6 +1832,10 @@ static bool functionNeedToExecute(SqlFunctionCtx *pCtx) {
// in case of timestamp column, always generated results.
int32_t functionId = pCtx->functionId;
if (functionId == -1) {
return false;
}
if (functionId == FUNCTION_TS) {
return true;
}
......@@ -2063,9 +2048,13 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
if (pExpr->pExpr->_function.pFunctNode != NULL) {
SFuncExecEnv env = {0};
fmGetFuncExecFuncs(pExpr->pExpr->_function.pFunctNode->funcId, &pCtx->fpSet);
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else {
pCtx->functionId = -1;
}
pCtx->input.numOfInputCols = pFunct->numOfParams;
......@@ -3318,8 +3307,6 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset);
cleanupResultRowEntry(pEntry);
......@@ -3417,7 +3404,7 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) {
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (isRowEntryInitialized(pResInfo)) {
if (isRowEntryInitialized(pResInfo) || pCtx[j].functionId == -1) {
continue;
}
......@@ -3458,6 +3445,10 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t j = 0; j < numOfOutput; ++j) {
if (pCtx[j].functionId == -1) {
continue;
}
pCtx[j].fpSet.finalize(&pCtx[j]);
}
}
......@@ -3480,7 +3471,9 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
continue;
}
pCtx[j].fpSet.finalize(&pCtx[j]);
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
pCtx[j].fpSet.finalize(&pCtx[j]);
}
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
......@@ -3586,20 +3579,13 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pRes
void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* bufPage = getBufPage(pBuf, pResult->pageId);
// int32_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
// offset += pCtx[i].resDataInfo.bytes;
continue;
}
// offset += pCtx[i].resDataInfo.bytes;
// int32_t functionId = pCtx[i].functionId;
// if (functionId < 0) {
// continue;
......@@ -3608,7 +3594,7 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult,
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
// }
if (!pResInfo->initialized) {
if (!pResInfo->initialized && pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
}
}
......@@ -6943,15 +6929,15 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
SGroupbyOperatorInfo *pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0/*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0|| !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->binfo.pRes;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -6964,29 +6950,23 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnIndex(NULL/*pInfo->pGroupbyExpr*/, pBlock);
}
doHashGroupbyAgg(pOperator, pInfo, pBlock);
}
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
// if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows
finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
// } else {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
// }
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
// if (!pRuntimeEnv->pQueryAttr->stableQuery) {
// sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
// }
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -7665,25 +7645,25 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
return pOperator;
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
// pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize *
// (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)));
pInfo->pGroupCols = pGroupColList;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
pInfo->binfo.pRes = pResBlock;
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
......@@ -7691,6 +7671,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
tfree(pInfo);
tfree(pOperator);
return NULL;
}
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
......@@ -8154,17 +8139,29 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
return s;
}
SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) {
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
int32_t numOfGroupKeys = 0;
if (pGroupKeys != NULL) {
numOfGroupKeys = LIST_LENGTH(pGroupKeys);
}
*numOfExprs = numOfFuncs;
SExprInfo* pExprs = calloc(numOfFuncs, sizeof(SExprInfo));
*numOfExprs = numOfFuncs + numOfGroupKeys;
SExprInfo* pExprs = calloc(*numOfExprs, sizeof(SExprInfo));
for(int32_t i = 0; i < numOfFuncs; ++i) {
SExprInfo* pExp = &pExprs[i];
for(int32_t i = 0; i < (*numOfExprs); ++i) {
STargetNode* pTargetNode = NULL;
if (i < numOfFuncs) {
pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
} else {
pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
}
SExprInfo* pExp = &pExprs[pTargetNode->slotId];
pExp->pExpr = calloc(1, sizeof(tExprNode));
pExp->pExpr->_function.num = 1;
pExp->pExpr->_function.functionId = -1;
pExp->base.pParam = calloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1;
......@@ -8172,10 +8169,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) {
pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn));
SColumn* pCol = pExp->base.pParam[0].pCol;
STargetNode* pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
ASSERT(pTargetNode->slotId == i);
// it is a project query
// it is a project query, or group by column
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr;
......@@ -8192,6 +8186,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) {
SDataType* pType = &pFuncNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
pExp->pExpr->_function.functionId = pFuncNode->funcId;
pExp->pExpr->_function.pFunctNode = pFuncNode;
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
......@@ -8201,11 +8196,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) {
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
SColumnNode* pcn = (SColumnNode*)p1;
pCol->slotId = pcn->slotId;
pCol->bytes = pcn->node.resType.bytes;
pCol->type = pcn->node.resType.type;
pCol->scale = pcn->node.resType.scale;
pCol->precision = pcn->node.resType.precision;
pCol->slotId = pcn->slotId;
pCol->bytes = pcn->node.resType.bytes;
pCol->type = pcn->node.resType.type;
pCol->scale = pcn->node.resType.scale;
pCol->precision = pcn->node.resType.precision;
pCol->dataBlockId = pcn->dataBlockId;
}
}
......@@ -8233,6 +8228,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList);
static SArray* extractColumnInfo(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
// if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
......@@ -8291,7 +8287,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, &num);
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
}
......@@ -8304,9 +8300,17 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SAggPhysiNode*)pPhyNode)->pAggFuncs, &num);
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
} else {
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
}
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
......@@ -8319,7 +8323,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, &num);
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = 'a', .slidingUnit = 'a'};
......@@ -8387,6 +8391,32 @@ SArray* extractScanColumnId(SNodeList* pNodeList) {
return pList;
}
SArray* extractColumnInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
SColumn c = {0};
c.slotId = pColNode->slotId;
c.colId = pColNode->colId;
c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes;
c.precision = pColNode->node.resType.precision;
c.scale = pColNode->node.resType.scale;
taosArrayPush(pList, &c);
}
return pList;
}
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = 0;
if (tableType == TSDB_SUPER_TABLE) {
......
......@@ -456,7 +456,7 @@ void firstFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = 0;
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
......@@ -500,7 +500,7 @@ void lastFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = 0;
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册