提交 e6e42400 编写于 作者: H Haojun Liao

[td-4151]refactor code.

上级 dfcb2f8e
...@@ -154,7 +154,7 @@ bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); ...@@ -154,7 +154,7 @@ bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo); bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType); SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType, int16_t colId);
int32_t tscSetTableFullName(SName* pName, SStrToken* pzTableName, SSqlObj* pSql); int32_t tscSetTableFullName(SName* pName, SStrToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo); void tscClearInterpInfo(SQueryInfo* pQueryInfo);
...@@ -290,7 +290,7 @@ void registerSqlObj(SSqlObj* pSql); ...@@ -290,7 +290,7 @@ void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql); SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex); void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex); void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlCmd* pCmd);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
......
...@@ -203,7 +203,7 @@ typedef struct SQueryInfo { ...@@ -203,7 +203,7 @@ typedef struct SQueryInfo {
SInterval interval; // tumble time window SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window SSessionWindow sessionWindow; // session time window
SGroupbyExpr groupbyExpr; // groupby tags info SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*> SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo; SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*> SArray * exprList; // SArray<SExprInfo*>
...@@ -251,6 +251,7 @@ typedef struct { ...@@ -251,6 +251,7 @@ typedef struct {
SVgroupsInfo *pVgroupInfo; SVgroupsInfo *pVgroupInfo;
} STableMetaVgroupInfo; } STableMetaVgroupInfo;
// TODO extract sql parser supporter
typedef struct { typedef struct {
int command; int command;
uint8_t msgType; uint8_t msgType;
...@@ -293,6 +294,7 @@ typedef struct { ...@@ -293,6 +294,7 @@ typedef struct {
SHashObj *pTableBlockHashList; // data block for each table SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int32_t resColumnId;
} SSqlCmd; } SSqlCmd;
typedef struct SResRec { typedef struct SResRec {
...@@ -514,7 +516,7 @@ extern int tscNumOfObj; // number of existed sqlObj in current process. ...@@ -514,7 +516,7 @@ extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SQueryInfo* pQueryInfo); int16_t getNewResColId(SSqlCmd* pCmd);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -59,6 +59,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para ...@@ -59,6 +59,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pCmd->curSql = pSql->sqlstr; pCmd->curSql = pSql->sqlstr;
pCmd->resColumnId = TSDB_RES_COL_ID;
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
......
...@@ -871,7 +871,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -871,7 +871,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
} }
SMultiwayMergeInfo *pAggInfo = pOperator->info; SMultiwayMergeInfo *pAggInfo = pOperator->info;
SOperatorInfo *upstream = pOperator->upstream; SOperatorInfo *upstream = pOperator->upstream[0];
*newgroup = false; *newgroup = false;
bool handleData = false; bool handleData = false;
...@@ -974,7 +974,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -974,7 +974,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) { if (pInfo->currentGroupOffset == 0) {
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -982,7 +982,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -982,7 +982,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks while ((*newgroup) == false) { // ignore the remain blocks
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -994,7 +994,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -994,7 +994,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock; return pBlock;
} }
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -1008,7 +1008,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -1008,7 +1008,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
} }
while ((*newgroup) == false) { while ((*newgroup) == false) {
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
......
此差异已折叠。
...@@ -1644,8 +1644,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { ...@@ -1644,8 +1644,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
qTableQuery(pQueryInfo->pQInfo, &localQueryId); qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo); convertQueryResult(pRes, pQueryInfo);
// handleDownstreamOperator(pRes, pQueryInfo);
code = pRes->code; code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows); (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
...@@ -2189,7 +2187,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -2189,7 +2187,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false); pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pCmd), pTableSchema[i].bytes, false);
} }
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput; pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......
...@@ -614,7 +614,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -614,7 +614,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS; int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL, getNewResColId(&pNew->cmd));
tscPrintSelNodeList(pNew, 0); tscPrintSelNodeList(pNew, 0);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
...@@ -817,7 +817,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* ...@@ -817,7 +817,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function // set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
...@@ -1906,7 +1906,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1906,7 +1906,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// set get tags query type // set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG); tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug( tscDebug(
...@@ -1917,7 +1917,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1917,7 +1917,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
} else { } else {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function // set the tags value for ts_comp function
SExprInfo *pExpr = tscExprGet(pNewQueryInfo, 0); SExprInfo *pExpr = tscExprGet(pNewQueryInfo, 0);
...@@ -2332,7 +2332,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2332,7 +2332,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->base.colInfo.colId); SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->base.colInfo.colId);
SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TS, &colIndex, schema, TSDB_COL_NORMAL); SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TS, &colIndex, schema, TSDB_COL_NORMAL, getNewResColId(pCmd));
p->base.resColId = pExpr->base.resColId; // update the result column id p->base.resColId = pExpr->base.resColId; // update the result column id
} else if (pExpr->base.functionId == TSDB_FUNC_STDDEV_DST) { } else if (pExpr->base.functionId == TSDB_FUNC_STDDEV_DST) {
taosArrayPush(pSup->pColsInfo, &pExpr->base.resColId); taosArrayPush(pSup->pColsInfo, &pExpr->base.resColId);
...@@ -2341,7 +2341,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2341,7 +2341,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SSchema schema = {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = sizeof(double)}; SSchema schema = {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = sizeof(double)};
tstrncpy(schema.name, pExpr->base.aliasName, tListLen(schema.name)); tstrncpy(schema.name, pExpr->base.aliasName, tListLen(schema.name));
SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_AVG, &colIndex, &schema, TSDB_COL_NORMAL); SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_AVG, &colIndex, &schema, TSDB_COL_NORMAL, getNewResColId(pCmd));
p->base.resColId = pExpr->base.resColId; // update the result column id p->base.resColId = pExpr->base.resColId; // update the result column id
} else if (pExpr->base.functionId == TSDB_FUNC_TAG) { } else if (pExpr->base.functionId == TSDB_FUNC_TAG) {
pSup->tagLen += pExpr->base.resBytes; pSup->tagLen += pExpr->base.resBytes;
...@@ -2354,7 +2354,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2354,7 +2354,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
schema = tGetTbnameColumnSchema(); schema = tGetTbnameColumnSchema();
} }
SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG); SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG, getNewResColId(pCmd));
p->base.resColId = pExpr->base.resColId; p->base.resColId = pExpr->base.resColId;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ) { } else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo); int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
...@@ -2368,7 +2368,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2368,7 +2368,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->base.colInfo.colId); SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->base.colInfo.colId);
//doLimitOutputNormalColOfGroupby //doLimitOutputNormalColOfGroupby
SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL); SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL, getNewResColId(pCmd));
p->base.numOfParams = 1; p->base.numOfParams = 1;
p->base.param[0].i64 = 1; p->base.param[0].i64 = 1;
p->base.param[0].nType = TSDB_DATA_TYPE_INT; p->base.param[0].nType = TSDB_DATA_TYPE_INT;
......
...@@ -14,10 +14,11 @@ ...@@ -14,10 +14,11 @@
*/ */
#include "tscUtil.h" #include "tscUtil.h"
#include "tsched.h"
#include "hash.h" #include "hash.h"
#include "os.h" #include "os.h"
#include "texpr.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "texpr.h"
#include "tkey.h" #include "tkey.h"
#include "tmd5.h" #include "tmd5.h"
#include "tscLocalMerge.h" #include "tscLocalMerge.h"
...@@ -634,18 +635,12 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc ...@@ -634,18 +635,12 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
} }
} }
static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* pTableCols) { static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) {
int32_t numOfCols = (int32_t) taosArrayGetSize(pTableCols); int32_t numOfCols = (int32_t) taosArrayGetSize(pTableCols);
SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo)); SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo));
SSchema *pSchema = pTableMeta->schema;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pTableCols, i); SColumn* pCol = taosArrayGetP(pTableCols, i);
int32_t index = pCol->columnIndex; pColInfo[i] = pCol->info;//[index].type;
pColInfo[i].type = pSchema[index].type;
pColInfo[i].bytes = pSchema[index].bytes;
pColInfo[i].colId = pSchema[index].colId;
} }
return pColInfo; return pColInfo;
...@@ -656,6 +651,10 @@ typedef struct SDummyInputInfo { ...@@ -656,6 +651,10 @@ typedef struct SDummyInputInfo {
SSqlRes *pRes; // refactor: remove it SSqlRes *pRes; // refactor: remove it
} SDummyInputInfo; } SDummyInputInfo;
typedef struct SJoinOperatorInfo {
int32_t a;
} SJoinOperatorInfo;
SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param; SOperatorInfo *pOperator = (SOperatorInfo*) param;
...@@ -729,6 +728,39 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t ...@@ -729,6 +728,39 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t
return pOptr; return pOptr;
} }
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SExprInfo* pExprInfo, int32_t numOfOutput) {
SJoinInfo* pInfo = calloc(1, sizeof(SJoinInfo));
/*
pInfo->pRes = (SSqlRes*) pResult;
pInfo->block = calloc(numOfCols, sizeof(SSDataBlock));
pInfo->block->info.numOfCols = numOfCols;
pInfo->block->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colData = {{0}};
colData.info.bytes = pSchema[i].bytes;
colData.info.type = pSchema[i].type;
colData.info.colId = pSchema[i].colId;
taosArrayPush(pInfo->block->pDataBlock, &colData);
}
*/
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "JoinOperator";
pOptr->operatorType = OP_Join;
pOptr->numOfOutput = numOfOutput;
pOptr->blockingOptr = false;
pOptr->info = pInfo;
pOptr->exec = doGetDataBlock;
pOptr->cleanup = destroyDummyInputOperator;
for(int32_t i = 0; i < numOfUpstream; ++i) {
appendUpstream(pOptr, pUpstream[0]);
}
return pOptr;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
// set the correct result // set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
...@@ -743,40 +775,51 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -743,40 +775,51 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->completed = (pRes->numOfRows == 0); pRes->completed = (pRes->numOfRows == 0);
} }
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSqlRes* pOutput) { void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* px, SSqlRes* pOutput) {
if (pQueryInfo->pDownstream != NULL) { // handle the following query process
// handle the following query process if (px->pQInfo == NULL) {
SQueryInfo *px = pQueryInfo->pDownstream; SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList); int32_t numOfOutput = (int32_t)tscNumOfExprs(px);
int32_t numOfOutput = (int32_t) tscNumOfExprs(px); int32_t numOfCols = (int32_t)taosArrayGetSize(px->colList);
SQueriedTableInfo info = {
.colList = pColumnInfo,
.numOfCols = numOfCols,
};
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta);
int32_t numOfCols = (int32_t) taosArrayGetSize(px->colList); STableGroupInfo tableGroupInfo = {
SQueriedTableInfo info = {.colList = pColumnInfo, .numOfCols = numOfCols,}; .numOfTables = 1,
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta); .pGroupList = taosArrayInit(1, POINTER_BYTES),
};
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN}; STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN};
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(group, &tableKeyInfo); taosArrayPush(group, &tableKeyInfo);
taosArrayPush(tableGroupInfo.pGroupList, &group); taosArrayPush(tableGroupInfo.pGroupList, &group);
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfCols); // if it is a join query, create join operator here
SOperatorInfo* pSourceOperator = createDummyInputOperator((char*)pRes, pSchema, numOfCols);
if (px->numOfTables > 1) {
SExprInfo *exprInfo = NULL; pSourceOperator = createJoinOperator(&pSourceOperator, 1, NULL, pSourceOperator->numOfOutput);
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo);
} }
uint64_t qId = 0; SExprInfo* exprInfo = NULL;
qTableQuery(px->pQInfo, &qId); /*int32_t code = */ createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
convertQueryResult(pOutput, px); px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo);
} }
uint64_t qId = 0;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px);
} }
static void tscDestroyResPointerInfo(SSqlRes* pRes) { static void tscDestroyResPointerInfo(SSqlRes* pRes) {
...@@ -2340,7 +2383,6 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -2340,7 +2383,6 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId = TSDB_RES_COL_ID;
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0; pQueryInfo->limit.offset = 0;
...@@ -2949,6 +2991,40 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -2949,6 +2991,40 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
} }
static void doRetrieveSubqueryData(SSchedMsg *pMsg) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle);
if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle);
return;
}
int32_t numOfRows = 0;
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
/*TAOS_ROW row = */taos_fetch_row(pSub);
// SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
// int32_t rows = taos_fetch_block(pSub, &row);
if (numOfRows == 0) {
numOfRows = pSub->res.numOfRows;
}
}
if (numOfRows > 0) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlObj *pSub = pSql->pSubs[0];
handleDownstreamOperator(&pSub->res, pQueryInfo, &pSql->res);
}
// int32_t code = pSql->res.code;
pSql->res.qId = -1;
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
} else {
tscAsyncResultOnError(pSql);
}
}
// NOTE: the blocking query can not be executed in the rpc message handler thread
static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
// handle the pDownStream process // handle the pDownStream process
SRetrieveSupport* ps = param; SRetrieveSupport* ps = param;
...@@ -2960,27 +3036,37 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { ...@@ -2960,27 +3036,37 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
pParentSql->cmd.active = pParentSql->cmd.pQueryInfo;
SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pParentSql->self;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
// merge all subquery result // merge all subquery result
SSqlCmd* pCmd = &pSql->cmd; // SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; // SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); // add it to the message queue
/*TAOS_ROW* pRow = */taos_fetch_row(pSql);
if (pSql->res.numOfRows > 0) {
handleDownstreamOperator(pRes, pQueryInfo, &pParentSql->res);
}
code = pParentSql->res.code; // SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pParentSql->res.qId = -1; // /*TAOS_ROW* pRow = */taos_fetch_row(pSql);
if (pParentSql->res.code == TSDB_CODE_SUCCESS) { // if (pSql->res.numOfRows > 0) {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); // handleDownstreamOperator(pRes, pQueryInfo, &pParentSql->res);
} else { // }
tscAsyncResultOnError(pParentSql); //
} // code = pParentSql->res.code;
// pParentSql->res.qId = -1;
// if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
// (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
// } else {
// tscAsyncResultOnError(pParentSql);
// }
} }
static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
printf("123\n");
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
} }
...@@ -3000,43 +3086,44 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3000,43 +3086,44 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, 0); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
pSql->cmd.active = pSub; pSql->cmd.active = pSub;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) { if (pNew == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
// return NULL; // return NULL;
} }
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr); // todo refactor
pNew->fp = tscSubqueryCompleteCallback;
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport));// todo use object id pNew->pTscObj = pSql->pTscObj;
ps->pParentSql = pSql; pNew->signature = pNew;
ps->subqueryIndex = 0; pNew->sqlstr = strdup(pSql->sqlstr); // todo refactor
pNew->fp = tscSubqueryCompleteCallback;
pNew->param = ps; SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
pSql->pSubs[0] = pNew; ps->pParentSql = pSql;
registerSqlObj(pNew); ps->subqueryIndex = i;
SSqlCmd* pCmd = &pNew->cmd; pNew->param = ps;
pCmd->command = TSDB_SQL_SELECT; pSql->pSubs[i] = pNew;
pCmd->parseFinished = 1; registerSqlObj(pNew);
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { SSqlCmd* pCmd = &pNew->cmd;
} pCmd->command = TSDB_SQL_SELECT;
pCmd->parseFinished = 1;
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
tscQueryInfoCopy(pNewQueryInfo, pSub); }
// create sub query to handle the sub query. SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pNew, pSub); tscQueryInfoCopy(pNewQueryInfo, pSub);
// create sub query to handle the sub query.
executeQuery(pNew, pSub);
}
// merge sub query result and generate final results // merge sub query result and generate final results
return; return;
} }
......
...@@ -302,6 +302,7 @@ enum OPERATOR_TYPE_E { ...@@ -302,6 +302,7 @@ enum OPERATOR_TYPE_E {
OP_GlobalAggregate = 18, // global merge for the multi-way data sources. OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19, OP_Filter = 19,
OP_Distinct = 20, OP_Distinct = 20,
OP_Join = 21,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
...@@ -314,7 +315,8 @@ typedef struct SOperatorInfo { ...@@ -314,7 +315,8 @@ typedef struct SOperatorInfo {
SExprInfo *pExpr; SExprInfo *pExpr;
SQueryRuntimeEnv *pRuntimeEnv; SQueryRuntimeEnv *pRuntimeEnv;
struct SOperatorInfo *upstream; struct SOperatorInfo **upstream; // upstream pointer list
int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator
__operator_fn_t exec; __operator_fn_t exec;
__optr_cleanup_fn_t cleanup; __optr_cleanup_fn_t cleanup;
} SOperatorInfo; } SOperatorInfo;
...@@ -494,6 +496,8 @@ typedef struct SMultiwayMergeInfo { ...@@ -494,6 +496,8 @@ typedef struct SMultiwayMergeInfo {
bool groupMix; bool groupMix;
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
...@@ -517,6 +521,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -517,6 +521,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SExprInfo* pExprInfo, int32_t numOfOutput);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
SSDataBlock* doSLimit(void* param, bool* newgroup); SSDataBlock* doSLimit(void* param, bool* newgroup);
......
...@@ -1710,38 +1710,38 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1710,38 +1710,38 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_MultiTableTimeInterval: { case OP_MultiTableTimeInterval: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_TimeWindow: { case OP_TimeWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_Groupby: { case OP_Groupby: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_SessionWindow: { case OP_SessionWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_MultiTableAggregate: { case OP_MultiTableAggregate: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_Aggregate: { case OP_Aggregate: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot->upstream->operatorType != OP_DummyInput) { if (pRuntimeEnv->proot->upstream[0]->operatorType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
} }
break; break;
} }
...@@ -3927,6 +3927,15 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3927,6 +3927,15 @@ void queryCostStatis(SQInfo *pQInfo) {
// return true; // return true;
//} //}
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
if (p->upstream == NULL) {
assert(p->numOfOutput == 0);
}
p->upstream = realloc(p->upstream, POINTER_BYTES * (p->numOfOutput + 1));
p->upstream[p->numOfOutput++] = pUpstream;
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) { static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
...@@ -4604,13 +4613,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -4604,13 +4613,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doGlobalAggregate; pOperator->exec = doGlobalAggregate;
pOperator->cleanup = destroyGlobalAggOperatorInfo; pOperator->cleanup = destroyGlobalAggOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -4675,7 +4685,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -4675,7 +4685,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
...@@ -4730,7 +4740,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { ...@@ -4730,7 +4740,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
...@@ -4814,7 +4824,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { ...@@ -4814,7 +4824,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
bool prevVal = *newgroup; bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead. // The upstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
assert(*newgroup == false); assert(*newgroup == false);
...@@ -4868,7 +4878,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -4868,7 +4878,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (1) { while (1) {
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -4939,7 +4949,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { ...@@ -4939,7 +4949,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) { while (1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -4981,7 +4991,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -4981,7 +4991,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
...@@ -5034,7 +5044,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5034,7 +5044,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
...@@ -5089,7 +5099,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -5089,7 +5099,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
...@@ -5140,7 +5150,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -5140,7 +5150,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
SOperatorInfo* upstream = pOperator->upstream; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
...@@ -5209,7 +5219,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -5209,7 +5219,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
} }
while(1) { while(1) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (*newgroup) { if (*newgroup) {
assert(pBlock != NULL); assert(pBlock != NULL);
} }
...@@ -5285,7 +5295,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { ...@@ -5285,7 +5295,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
pOperator->cleanup(pOperator->info, pOperator->numOfOutput); pOperator->cleanup(pOperator->info, pOperator->numOfOutput);
} }
destroyOperatorInfo(pOperator->upstream); destroyOperatorInfo(pOperator->upstream[0]);
tfree(pOperator->info); tfree(pOperator->info);
tfree(pOperator); tfree(pOperator);
} }
...@@ -5310,13 +5320,14 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -5310,13 +5320,14 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAggregate; pOperator->exec = doAggregate;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5383,13 +5394,13 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -5383,13 +5394,13 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableAggregate; pOperator->exec = doSTableAggregate;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5413,13 +5424,13 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5413,13 +5424,13 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doArithmeticOperation; pOperator->exec = doArithmeticOperation;
pOperator->cleanup = destroyArithOperatorInfo; pOperator->cleanup = destroyArithOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5464,11 +5475,11 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -5464,11 +5475,11 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->upstream = upstream;
pOperator->exec = doFilter; pOperator->exec = doFilter;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroyConditionOperatorInfo; pOperator->cleanup = destroyConditionOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5483,10 +5494,10 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5483,10 +5494,10 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->operatorType = OP_Limit; pOperator->operatorType = OP_Limit;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->exec = doLimit; pOperator->exec = doLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5504,7 +5515,6 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -5504,7 +5515,6 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
pOperator->operatorType = OP_TimeWindow; pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5512,6 +5522,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -5512,6 +5522,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
pOperator->exec = doIntervalAgg; pOperator->exec = doIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5529,7 +5540,6 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -5529,7 +5540,6 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->operatorType = OP_SessionWindow; pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5537,6 +5547,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -5537,6 +5547,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->exec = doSessionWindowAgg; pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5552,7 +5563,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti ...@@ -5552,7 +5563,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5561,6 +5571,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti ...@@ -5561,6 +5571,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
pOperator->exec = doSTableIntervalAgg; pOperator->exec = doSTableIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5577,7 +5588,6 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -5577,7 +5588,6 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Groupby; pOperator->operatorType = OP_Groupby;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5585,6 +5595,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -5585,6 +5595,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->exec = hashGroupbyAggregate; pOperator->exec = hashGroupbyAggregate;
pOperator->cleanup = destroyGroupbyOperatorInfo; pOperator->cleanup = destroyGroupbyOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5614,8 +5625,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -5614,8 +5625,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Fill; pOperator->operatorType = OP_Fill;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5624,6 +5633,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -5624,6 +5633,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->exec = doFill; pOperator->exec = doFill;
pOperator->cleanup = destroySFillOperatorInfo; pOperator->cleanup = destroySFillOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5662,11 +5672,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -5662,11 +5672,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pOperator->operatorType = OP_SLimit; pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->exec = doSLimit; pOperator->exec = doSLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroySlimitOperatorInfo; pOperator->cleanup = destroySlimitOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
...@@ -5833,7 +5844,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -5833,7 +5844,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0; pRes->info.rows = 0;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -5894,12 +5905,13 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat ...@@ -5894,12 +5905,13 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Distinct; pOperator->operatorType = OP_Distinct;
pOperator->upstream = upstream;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = hashDistinct; pOperator->exec = hashDistinct;
pOperator->cleanup = destroyDistinctOperatorInfo; pOperator->cleanup = destroyDistinctOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册