diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9b5cd8fa0ff1912609b777c1e5e2391c6894d338..a4bf1d90052854d13146e2e420184e16ed859a26 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -230,7 +230,8 @@ typedef struct SQueryInfo { char* buf; struct SQueryInfo *sibling; // sibling - SArray *pUpstream; //SArray + SArray *pUpstream; // SArray + SArray *pDownstream; // SArray } SQueryInfo; typedef struct { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 2e918c09f202a82f836109b4539a18e76459f235..0b1c062c0e3f7b1d864fb1dcffd6140507d16524 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -6677,8 +6677,11 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind tscInitQueryInfo(pQueryInfo1); pQueryInfo1->pUpstream = taosArrayInit(4, POINTER_BYTES); + pQueryInfo1->pDownstream = taosArrayInit(4, POINTER_BYTES); + for(int32_t x = 0; x < pCmd->numOfClause; ++x) { taosArrayPush(pQueryInfo1->pUpstream, &pCmd->pQueryInfo[x]); + taosArrayPush(pCmd->pQueryInfo[x]->pDownstream, &pQueryInfo1); } pQueryInfo1->numOfTables = 1; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ae58d676280cc6c6d206c4f9515f3eb38385857e..652d34eee5938c6ce5204c7b3d52f17a7ee9db5d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2331,6 +2331,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { } prepareInputDataFromUpstream(pRes, pQueryInfo); + if (pSql->pSubscription != NULL) { int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b12eaeb4942427bc2be239a6d8536c90a9212449..7bfe1754df81ee72fdfdf530969fbf93842c3ba8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -372,7 +372,11 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { } void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { - printf("abc\n"); + if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { + // handle the following query process + SQueryInfo* px = taosArrayGetP(pQueryInfo->pDownstream, 0); + printf("%d\n", px->type); + } } static void tscDestroyResPointerInfo(SSqlRes* pRes) { @@ -1772,6 +1776,8 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->slimit.limit = -1; pQueryInfo->slimit.offset = 0; pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); + } int32_t tscAddQueryInfo(SSqlCmd* pCmd) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e0e18a7cc851fc8fdec8a23e47b808a9f1b2f6ed..8a1c86c389d25d6361d03490de3f508eb4f85cd2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5770,7 +5770,7 @@ _cleanup: return code; } -static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { +static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg) { qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg); tExprNode* pExprNode = NULL; @@ -5812,17 +5812,25 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI } // TODO tag length should be passed from client +typedef struct { + int32_t numOfOutput; + int32_t numOfTags; + int32_t numOfCols; + SColumnInfo* colList; + int32_t queryTest; +} SQueriedTableMeta; + int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, - SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) { + SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols, bool isSuperTable) { *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; - SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo)); + SExprInfo *pExprs = (SExprInfo *)calloc(numOfOutput, sizeof(SExprInfo)); if (pExprs == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); +// bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); int16_t tagLen = 0; for (int32_t i = 0; i < numOfOutput; ++i) {