From 5295955b72f995fd6ab192066cb3cdca54b4a17e Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 6 Jan 2020 10:49:14 +0800 Subject: [PATCH] add the union support in sql parser: refactor some codes. #1032. [TBASE-1140] --- src/client/inc/tscUtil.h | 9 +- src/client/inc/tsclient.h | 2 +- src/client/src/tscAsync.c | 11 +- src/client/src/tscJoinProcess.c | 10 +- src/client/src/tscParseInsert.c | 18 +- src/client/src/tscPrepare.c | 5 +- src/client/src/tscSQLParser.c | 44 +++-- src/client/src/tscSecondaryMerge.c | 8 +- src/client/src/tscServer.c | 69 +++++--- src/client/src/tscSql.c | 258 +++++++++++++++++++---------- src/client/src/tscUtil.c | 133 ++++++++------- 11 files changed, 346 insertions(+), 221 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3190fd7057..fd0973f319 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -97,10 +97,10 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); */ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); -bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex); +bool tscProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); -bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd); +bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscQueryOnMetric(SSqlCmd* pCmd); bool tscQueryMetricTags(SQueryInfo* pQueryInfo); bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd); @@ -126,7 +126,7 @@ void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, co void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible); void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo); -void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); +void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size); void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src); @@ -135,6 +135,7 @@ int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int32_t tscGetResRowLength(SQueryInfo* pQueryInfo); void tscClearFieldInfo(SFieldInfo* pFieldInfo); int32_t tscNumOfFields(SQueryInfo* pQueryInfo); +int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2); void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); @@ -163,7 +164,7 @@ int32_t tscValidateName(SSQLToken* pToken); void tscIncStreamExecutionCount(void* pStream); -bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId); +bool tscValidateColumnId(SMeterMetaInfo* pMeterMetaInfo, int32_t colId); // get starter position of metric query condition (query on tags) in SSqlCmd.payload SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 2de9819d92..a3e1e624c1 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -253,9 +253,9 @@ typedef struct { union { int32_t count; int32_t numOfTablesInSubmit; - int32_t clauseIndex; // index of multiple subclause query }; + int32_t clauseIndex; // index of multiple subclause query short numOfCols; uint32_t allocSize; char * payload; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 22613386a5..8b0edaf9fe 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -121,7 +121,7 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); // sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx - if (numOfRows == 0 && tscProjectionQueryOnSTable(pCmd, 0)) { + if (numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) { // vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); assert(pMeterMetaInfo->vnodeIndex >= 0); @@ -133,7 +133,6 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf } /* update the limit value according to current retrieval results */ - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal; pQueryInfo->limit.offset = pRes->offset; @@ -269,14 +268,14 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (numOfRows == 0) { // sequentially retrieve data from remain vnodes. - if (tscProjectionQueryOnSTable(pCmd, 0)) { + if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { /* * vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved */ - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); assert(pMeterMetaInfo->vnodeIndex >= 0); /* reach the maximum number of output rows, abort */ @@ -527,7 +526,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { } } else { // stream computing - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); code = tscGetMeterMeta(pSql, pMeterMetaInfo); pRes->code = code; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 4e0ce14a6c..1f26e2a0e5 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -438,7 +438,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { taos_fetch_rows_a(tres, joinRetrieveCallback, param); } else if (numOfRows == 0) { // no data from this vnode anymore - if (tscProjectionQueryOnSTable(&pParentSql->cmd, 0)) { + if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -494,9 +494,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { pSql->res.numOfTotal += pSql->res.numOfRows; } - SSqlCmd* pCmd = &pSql->cmd; - - if (tscProjectionQueryOnSTable(pCmd, 0) && numOfRows == 0) { + if (tscProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -541,7 +539,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - if (tscProjectionQueryOnSTable(&pSql->cmd, 0)) { + if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes && (!tscHasReachLimitation(pSql->pSubs[i]))) { numOfFetch++; @@ -709,7 +707,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of * data instead of returning to its invoker */ - if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnSTable(&pSql->cmd, 0)) { + if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) { assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 7af27aa386..f67319bc43 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -655,7 +655,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) { static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd, int32_t *totalNum) { SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; STableDataBlocks *dataBuf = NULL; @@ -1143,7 +1143,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { strcpy(pDataBlock->filename, fname); } else if (sToken.type == TK_LP) { /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */ - SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0, 0)->pMeterMeta; + SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta; SSchema * pSchema = tsGetSchema(pMeterMeta); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { @@ -1275,7 +1275,7 @@ int tsParseInsertSql(SSqlObj *pSql) { pCmd->command = TSDB_SQL_INSERT; SQueryInfo *pQueryInfo = NULL; - tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo); + tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); if (sToken.type == TK_INSERT) { TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); @@ -1343,7 +1343,8 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock int32_t code = TSDB_CODE_SUCCESS; SSqlCmd *pCmd = &pSql->cmd; - SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0, 0)->pMeterMeta; + assert(pCmd->numOfClause == 1); + SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta; SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData); tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows); @@ -1375,8 +1376,11 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { int numOfRows = 0; int32_t code = 0; int nrows = 0; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; + assert(pCmd->numOfClause == 1); + int32_t rowSize = pMeterMeta->rowSize; pCmd->pDataBlocks = tscCreateBlockArrayList(); @@ -1465,7 +1469,9 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) { } STableDataBlocks *pDataBlock = NULL; - SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + assert(pCmd->numOfClause == 1); + int32_t code = TSDB_CODE_SUCCESS; /* the first block has been sent to server in processSQL function */ diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 51c43fd3f7..744ffd5b2b 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) { } pCmd->batchSize = 0; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); pMeterMetaInfo->vnodeIndex = 0; return TSDB_CODE_SUCCESS; } @@ -422,7 +422,8 @@ static int insertStmtExecute(STscStmt* stmt) { ++pCmd->batchSize; } - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + assert(pCmd->numOfClause == 1); if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgid diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f6ef076b27..49d2678ea5 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -101,7 +101,7 @@ static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex); -static int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* pSql); +static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql); static int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); @@ -205,7 +205,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), pInfo->pzErrMsg); } - int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo); + int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); assert(pQueryInfo->numOfTables == 0); SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); @@ -502,7 +502,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case TSDB_SQL_SELECT: { assert(pCmd->numOfClause == 1); - + const char* msg1 = "columns in select caluse not identical"; + for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) { SQueryInfo* pqi = NULL; if ((code = tscGetQueryInfoDetailSafely(pCmd, i, &pqi)) != TSDB_CODE_SUCCESS) { @@ -526,6 +527,17 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { // if there is only one element, the limit of clause is the limit of global result. if (pCmd->numOfClause == 1) { pCmd->globalLimit = pQueryInfo1->clauseLimit; + } else { // check the output fields information, column name and column type + pCmd->globalLimit = -1; + + for(int32_t i = 1; i < pCmd->numOfClause; ++i) { + SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pCmd, i); + + int32_t ret = tscFieldInfoCompare(&pQueryInfo1->fieldsInfo, &pQueryInfo2->fieldsInfo); + if (ret != 0) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + } } return TSDB_CODE_SUCCESS; // do not build query message here @@ -852,7 +864,9 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) { const char* msg5 = "invalid binary/nchar tag length"; const char* msg6 = "invalid data type in tags"; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + assert(pCmd->numOfClause == 1); + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; // no more than 6 tags @@ -921,7 +935,8 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) { const char* msg5 = "invalid column name"; const char* msg6 = "invalid column length"; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + assert(pCmd->numOfClause == 1); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; // no more max columns @@ -1975,8 +1990,9 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) { int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); - + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + assert(pCmd->numOfClause == 1); + pCmd->command = TSDB_SQL_SHOW; const char* msg1 = "invalid name"; @@ -2125,7 +2141,7 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { } } - tscFieldInfoUpdateOffset(pQueryInfo); + tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); return TSDB_CODE_SUCCESS; } @@ -2370,7 +2386,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) { if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { - tscFieldInfoUpdateOffset(pQueryInfo); + tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); } else { tscFieldInfoCalOffset(pQueryInfo); } @@ -4410,7 +4426,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) { return (pQueryInfo->stime == pQueryInfo->etime) && (pQueryInfo->stime != 0); } -int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* pSql) { +int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* pQuerySql, SSqlObj* pSql) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); const char* msg0 = "soffset/offset can not be less than 0"; @@ -4443,7 +4459,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* if (queryOnTags == true) { // local handle the metric tag query pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS; } else { - if (tscProjectionQueryOnSTable(&pSql->cmd, 0) && + if (tscProjectionQueryOnSTable(pQueryInfo, 0) && (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0)) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); } @@ -4461,7 +4477,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* * And then launching multiple async-queries against all qualified virtual nodes, during the first-stage * query operation. */ - int32_t code = tscGetMetricMeta(pSql, 0); + int32_t code = tscGetMetricMeta(pSql, clauseIndex); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4983,7 +4999,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { } // projection query on metric does not compatible with "group by" syntax - if (tscProjectionQueryOnSTable(pCmd, 0)) { + if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -5564,7 +5580,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { pQueryInfo->limit = pQuerySql->limit; // temporarily save the original limitation value - if ((code = parseLimitClause(pQueryInfo, pQuerySql, pSql)) != TSDB_CODE_SUCCESS) { + if ((code = parseLimitClause(pQueryInfo, index, pQuerySql, pSql)) != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index ebf1a7a8b7..e9a63b028b 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -311,7 +311,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pRes->pLocalReducer = pReducer; pRes->numOfGroups = 0; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); int16_t prec = pMeterMetaInfo->pMeterMeta->precision; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; @@ -582,7 +582,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr tColModel *pModel = NULL; *pFinalModel = NULL; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pMeterMetaInfo->pMetricMeta->numOfVnodes); @@ -871,7 +871,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId; } - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); int8_t precision = pMeterMetaInfo->pMeterMeta->precision; while (1) { @@ -1212,7 +1212,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer pQueryInfo->limit.offset = pLocalReducer->offset; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); int16_t precision = pMeterMetaInfo->pMeterMeta->precision; // for group result interpolation, do not return if not data is generated diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 358c7d4692..ee66599f0f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -129,6 +129,11 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { pSql->fp = tscProcessHeartBeatRsp; pSql->cmd.command = TSDB_SQL_HB; + + SQueryInfo *pQueryInfo = NULL; + tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo); + pQueryInfo->command = TSDB_SQL_HB; + if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) { tfree(pSql); return; @@ -223,7 +228,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { pSql->thandle = NULL; SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { // multiple vnode query SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex); @@ -423,7 +428,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { return ahandle; } - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); if (msg == NULL) { tscTrace("%p no response from ip:0x%x", pSql, pSql->ip); @@ -783,6 +788,9 @@ int tscProcessSql(SSqlObj *pSql) { } type = pQueryInfo->type; + + // for hearbeat, numOfTables == 0; + assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); } tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type); @@ -796,6 +804,7 @@ int tscProcessSql(SSqlObj *pSql) { if (pMeterMetaInfo == NULL) { // the pMeterMetaInfo cannot be NULL pSql->res.code = TSDB_CODE_OTHERS; + assert(0); return pSql->res.code; } @@ -862,7 +871,7 @@ int tscProcessSql(SSqlObj *pSql) { } } - if (tscIsTwoStageMergeMetricQuery(pCmd)) { + if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { /* * (ref. line: 964) * Before this function returns from tscLaunchMetricSubQueries and continues, pSql may have been released at user @@ -932,7 +941,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { const uint32_t nBufferSize = (1 << 16); // 64KB - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); int32_t numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes; assert(numOfSubQueries > 0); @@ -1308,7 +1317,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { } void tscKillMetricQuery(SSqlObj *pSql) { - if (!tscIsTwoStageMergeMetricQuery(&pSql->cmd)) { + SSqlCmd* pCmd = &pSql->cmd; + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { return; } @@ -1351,14 +1363,17 @@ void tscKillMetricQuery(SSqlObj *pSql) { static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { - SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj); + const int32_t table_index = 0; + + SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - assert(pQueryInfo->numOfTables == 1); + + assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); // launch subquery for each vnode, so the subquery index equals to the vnodeIndex. - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, table_index); pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex; pSql->pSubs[trsupport->subqueryIndex] = pNew; @@ -1471,7 +1486,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { SShellSubmitMsg *pShellMsg; char * pMsg; - SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); + SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0); SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -1506,13 +1521,13 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), htons(pShellMsg->vnode)); - - return msgLen; + + return TSDB_CODE_SUCCESS; } void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); char * pStart = buf + tsRpcHeadSize; SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; @@ -1561,7 +1576,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { } static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfMeters, int32_t vnodeId, char *pMsg) { - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0); SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; @@ -1611,8 +1626,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + char * pStart = pCmd->payload + tsRpcHeadSize; SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -1772,7 +1787,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { hasArithmeticFunction = true; } - if (!tscValidateColumnId(pCmd, pExpr->colInfo.colId)) { + if (!tscValidateColumnId(pMeterMetaInfo, pExpr->colInfo.colId)) { /* column id is not valid according to the cached metermeta, the meter meta is expired */ tscError("%p table schema is not matched with parsed sql", pSql); return -1; @@ -1913,7 +1928,9 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = doBuildMsgHeader(pSql, &pStart); pCreateDbMsg = (SCreateDbMsg *)pMsg; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + assert(pCmd->numOfClause == 1); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + strncpy(pCreateDbMsg->db, pMeterMetaInfo->name, tListLen(pCreateDbMsg->db)); pMsg += sizeof(SCreateDbMsg); @@ -2063,7 +2080,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = doBuildMsgHeader(pSql, &pStart); pDropDbMsg = (SDropDbMsg *)pMsg; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db)); pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; @@ -2085,7 +2102,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = doBuildMsgHeader(pSql, &pStart); pDropTableMsg = (SDropTableMsg *)pMsg; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pDropTableMsg->meterId, pMeterMetaInfo->name); pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; @@ -2103,7 +2120,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char * pMsg, *pStart; SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); pMsg = doBuildMsgHeader(pSql, &pStart); pDrop = (SDropDnodeMsg *)pMsg; @@ -2128,7 +2145,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = doBuildMsgHeader(pSql, &pStart); pDropMsg = (SDropUserMsg *)pMsg; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pDropMsg->user, pMeterMetaInfo->name); pMsg += sizeof(SDropUserMsg); @@ -2148,7 +2165,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = doBuildMsgHeader(pSql, &pStart); pUseDbMsg = (SUseDbMsg *)pMsg; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pUseDbMsg->db, pMeterMetaInfo->name); pMsg += sizeof(SUseDbMsg); @@ -2178,7 +2195,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SMgmtHead *pMgmt = (SMgmtHead *)pMsg; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); size_t nameLen = strlen(pMeterMetaInfo->name); if (nameLen > 0) { @@ -2414,7 +2431,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd * pCmd = &pSql->cmd; STscObj * pObj = pSql->pTscObj; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); pStart = pCmd->payload + tsRpcHeadSize; pMsg = pStart; @@ -2532,7 +2549,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) { int tscProcessDescribeTableRsp(SSqlObj *pSql) { SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); int32_t numOfRes = pMeterMetaInfo->pMeterMeta->numOfColumns + pMeterMetaInfo->pMeterMeta->numOfTags; @@ -2772,7 +2789,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(int16_t); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, i); + pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, i); uint64_t uid = pMeterMetaInfo->pMeterMeta->uid; offset = pMsg - (char *)pMetaMsg; @@ -3431,7 +3448,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { * If the query result is exhausted, or current query is to free resource at server side, * the connection will be recycled. */ - if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pCmd, 0) && pRes->offset > 0)) || + if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pQueryInfo, 0) && pRes->offset > 0)) || ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) { tscTrace("%p no result or free resource, recycle connection", pSql); taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 6aa6cfaf5a..2175d017ad 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "hash.h" #include "os.h" #include "tcache.h" #include "tlog.h" @@ -28,7 +29,6 @@ #include "tsocket.h" #include "ttimer.h" #include "tutil.h" -#include "hash.h" TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { @@ -205,7 +205,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { taosCleanUpHashTable(pSql->pTableHashList); pSql->pTableHashList = NULL; } - + tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj); pRes->code = (uint8_t)tsParseSql(pSql, false); @@ -293,11 +293,11 @@ int taos_num_fields(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) return 0; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); if (pQueryInfo == NULL) { return 0; } - + SFieldInfo *pFieldsInfo = &pQueryInfo->fieldsInfo; return (pFieldsInfo->numOfOutputCols - pFieldsInfo->numOfHiddenCols); } @@ -319,8 +319,8 @@ int taos_affected_rows(TAOS *taos) { TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) return 0; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); return pQueryInfo->fieldsInfo.pFields; } @@ -370,7 +370,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { pRes->numOfTotal += pRes->numOfRows; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * (1 - pQueryInfo->order.order) * (pRes->numOfRows - 1); @@ -384,9 +384,9 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { static void **doSetResultRowData(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + int32_t num = 0; for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; @@ -439,16 +439,17 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { bool hasData = true; SSqlCmd *pCmd = &pSql->cmd; - if (tscProjectionQueryOnSTable(pCmd, 0)) { + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { bool allSubqueryExhausted = true; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlRes *pRes1 = &pSql->pSubs[i]->res; SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0); - SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - + SQueryInfo * pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex); + SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo1, 0); + assert(pQueryInfo->numOfTables == 1); /* @@ -465,12 +466,12 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { hasData = !allSubqueryExhausted; } else { // otherwise, in case inner join, if any subquery exhausted, query completed. for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlRes *pRes1 = &pSql->pSubs[i]->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); - + SSqlRes * pRes1 = &pSql->pSubs[i]->res; + SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); + if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pSql->pSubs[i]) && - tscProjectionQueryOnTable(pQueryInfo)) || (pRes1->numOfRows == 0)) { - + tscProjectionQueryOnTable(pQueryInfo1)) || + (pRes1->numOfRows == 0)) { hasData = false; break; } @@ -501,9 +502,9 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { free(pState); return NULL; } - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + if (pRes->tsrow == NULL) { pRes->tsrow = malloc(POINTER_BYTES * pQueryInfo->exprsInfo.numOfExprs); } @@ -578,9 +579,82 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - tscProcessSql(pSql); - if (pRes->numOfRows == 0) { - return NULL; + tscProcessSql(pSql); // retrieve data from virtual node + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + /* + * no result returned from the current virtual node anymore, try the next vnode if exists + * if case of: multi-vnode super table projection query + */ + if (pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) { + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes; + + while (++pMeterMetaInfo->vnodeIndex < totalVnode) { + tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, + pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal); + + // reach the maximum number of output rows, abort + if (tscHasReachLimitation(pSql)) { + return NULL; + } + + /* + * update the limit and offset value for the query on the next vnode, + * according to current retrieval results + * + * NOTE: + * if the pRes->offset is larger than 0, the start returned position has not reached yet. + * Therefore, the pRes->numOfRows, as well as pRes->numOfTotal, must be 0. + * The pRes->offset value will be updated by virtual node, during query execution. + */ + if (pQueryInfo->clauseLimit >= 0) { + pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotal; + } + + pQueryInfo->limit.offset = pRes->offset; + + assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); + tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, + pSql, pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, + pQueryInfo->clauseLimit); + + /* + * For project query with super table join, the numOfSub is equalled to the number of all subqueries. + * Therefore, we need to reset the value of numOfSubs to be 0. + * + * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. + */ + pSql->numOfSubs = 0; + + pCmd->command = TSDB_SQL_SELECT; + assert(pSql->fp == NULL); + + int32_t ret = tscProcessSql(pSql); // todo check for failure + if (ret != TSDB_CODE_SUCCESS) { + pSql->res.code = ret; + return NULL; + } + + // retrieve data + assert(pCmd->command == TSDB_SQL_SELECT); + pCmd->command = TSDB_SQL_FETCH; + + if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { + pSql->res.code = ret; + return NULL; + } + + // if the result from current virtual node are empty, try next if exists. otherwise, return the results. + if (pRes->numOfRows > 0) { + break; + } + } + + if (pRes->numOfRows == 0) { + tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); + } } /* @@ -590,6 +664,10 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { pRes->numOfTotal += pRes->numOfRows; } + + if (pRes->numOfRows == 0) { + return NULL; + } } return getOneRowFromBuf(pSql); @@ -597,8 +675,8 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; +// SSqlCmd *pCmd = &pSql->cmd; +// SSqlRes *pRes = &pSql->res; if (pSql == NULL || pSql->signature != pSql) { globalCode = TSDB_CODE_DISCONNECTED; @@ -607,63 +685,64 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { // projection query on metric, pipeline retrieve data from vnode list, instead of two-stage merge TAOS_ROW rows = taos_fetch_row_impl(res); - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - while (rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) { - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - - // reach the maximum number of output rows, abort - if (tscHasReachLimitation(pSql)) { - return NULL; - } - - /* - * update the limit and offset value according to current retrieval results - * Note: if pRes->offset > 0, pRes->numOfRows = 0, pRes->numOfTotal = 0; - */ - pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal; - pQueryInfo->limit.offset = pRes->offset; - - assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - /* - * For project query with super table join, the numOfSub is equalled to the number of all subqueries, so - * we need to reset the value of numOfSubs to be 0. - * - * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. - */ - pSql->numOfSubs = 0; - - if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { - pCmd->command = TSDB_SQL_SELECT; - assert(pSql->fp == NULL); - tscProcessSql(pSql); - rows = taos_fetch_row_impl(res); - } - - // check!!! - if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { - break; - } - } - - // current subclause is completed, try the next subclause - if (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { - pSql->cmd.command = TSDB_SQL_SELECT; - pCmd->clauseIndex++; - - assert(pSql->fp == NULL); - tscProcessSql(pSql); - - rows = taos_fetch_row_impl(res); - } + // SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + // while (rows == NULL && tscProjectionQueryOnSTable(pQueryInfo, 0)) { + // SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + // + // // reach the maximum number of output rows, abort + // if (tscHasReachLimitation(pSql)) { + // return NULL; + // } + // + // /* + // * update the limit and offset value according to current retrieval results + // * Note: if pRes->offset > 0, pRes->numOfRows = 0, pRes->numOfTotal = 0; + // */ + // pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal; + // pQueryInfo->limit.offset = pRes->offset; + // + // assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); + // + // /* + // * For project query with super table join, the numOfSub is equalled to the number of all subqueries, so + // * we need to reset the value of numOfSubs to be 0. + // * + // * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. + // */ + // pSql->numOfSubs = 0; + // + // if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + // pCmd->command = TSDB_SQL_SELECT; + // assert(pSql->fp == NULL); + // tscProcessSql(pSql); + // rows = taos_fetch_row_impl(res); + // } + // + // // check!!! + // if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { + // break; + // } + // } + // + // // current subclause is completed, try the next subclause + // if (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { + // pSql->cmd.command = TSDB_SQL_SELECT; + // pCmd->clauseIndex++; + // + // assert(pSql->fp == NULL); + // + // tscTrace("%p start next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); + // tscProcessSql(pSql); + // + // rows = taos_fetch_row_impl(res); + // } return rows; } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { SSqlObj *pSql = (SSqlObj *)res; - SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; int nRows = 0; @@ -677,9 +756,9 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { // projection query on metric, pipeline retrieve data from vnode list, // instead of two-stage mergevnodeProcessMsgFromShell free qhandle nRows = taos_fetch_block_impl(res, rows); - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - while (*rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) { + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + while (*rows == NULL && tscProjectionQueryOnSTable(pQueryInfo, 0)) { /* reach the maximum number of output rows, abort */ if (tscHasReachLimitation(pSql)) { return 0; @@ -746,7 +825,7 @@ void taos_free_result(TAOS_RES *res) { } // set freeFlag to 1 in retrieve message if there are un-retrieved results - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); @@ -863,12 +942,15 @@ void taos_stop_query(TAOS_RES *res) { if (res == NULL) return; SSqlObj *pSql = (SSqlObj *)res; + SSqlCmd *pCmd = &pSql->cmd; + if (pSql->signature != pSql) return; tscTrace("%p start to cancel query", res); pSql->res.code = TSDB_CODE_QUERY_CANCELLED; - if (tscIsTwoStageMergeMetricQuery(&pSql->cmd)) { + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { tscKillMetricQuery(pSql); return; } @@ -915,15 +997,13 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) float fv = 0; fv = GET_FLOAT_VAL(row[i]); len += sprintf(str + len, "%f ", fv); - } - break; + } break; - case TSDB_DATA_TYPE_DOUBLE:{ + case TSDB_DATA_TYPE_DOUBLE: { double dv = 0; dv = GET_DOUBLE_VAL(row[i]); len += sprintf(str + len, "%lf ", dv); - } - break; + } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: { @@ -1011,9 +1091,9 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t int code = TSDB_CODE_INVALID_METER_ID; char *str = (char *)tblNameList; - SQueryInfo* pQueryInfo = NULL; - tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo); - + SQueryInfo *pQueryInfo = NULL; + tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); + SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index ea08901d6b..c456324de5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -208,47 +208,38 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { return (SMeterSidExtInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList); } -bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd) { - assert(pCmd != NULL); - - int32_t subClauseIndex = 0; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); +bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { if (pQueryInfo == NULL) { return false; } - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex); if (pMeterMetaInfo == NULL || pMeterMetaInfo->pMetricMeta == NULL) { return false; } // for projection query, iterate all qualified vnodes sequentially - if (tscProjectionQueryOnSTable(pCmd, subClauseIndex)) { + if (tscProjectionQueryOnSTable(pQueryInfo, tableIndex)) { return false; } if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) && - pCmd->command == TSDB_SQL_SELECT) { + pQueryInfo->command == TSDB_SQL_SELECT) { return UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo); } return false; } -bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex) { - assert(pCmd != NULL); - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); - assert(pQueryInfo->numOfTables > 0); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); +bool tscProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex) { + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex); /* * In following cases, return false for project query on metric * 1. failed to get metermeta from server; 2. not a metric; 3. limit 0; 4. show query, instead of a select query */ if (pMeterMetaInfo == NULL || !UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo) || - pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) { + pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) { return false; } @@ -538,14 +529,16 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { assert(pDataBlock->pMeterMeta != NULL); pCmd->numOfTablesInSubmit = pDataBlock->numOfMeters; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + + assert(pCmd->numOfClause == 1); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); // set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) { strcpy(pMeterMetaInfo->name, pDataBlock->meterId); taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false); - - pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pDataBlock->pMeterMeta); + + pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta); } else { assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0); } @@ -654,9 +647,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i]; STableDataBlocks* dataBuf = NULL; - int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, - TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId, - pOneTableBlock->pMeterMeta, &dataBuf); + int32_t ret = + tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE, + tsInsertHeadSize, 0, pOneTableBlock->meterId, pOneTableBlock->pMeterMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); taosCleanUpHashTable(pVnodeDataBlockHashList); @@ -859,7 +852,7 @@ void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) { } } -void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) { +void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; if (pFieldInfo->numOfOutputCols == 0) { return; @@ -922,6 +915,26 @@ int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { return pQueryInfo->fieldsInfo.pOffset[index]; } +int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) { + assert(pFieldInfo1 != NULL && pFieldInfo2 != NULL); + + if (pFieldInfo1->numOfOutputCols != pFieldInfo2->numOfOutputCols) { + return pFieldInfo1->numOfOutputCols - pFieldInfo2->numOfOutputCols; + } + + for (int32_t i = 0; i < pFieldInfo1->numOfOutputCols; ++i) { + TAOS_FIELD* pField1 = &pFieldInfo1->pFields[i]; + TAOS_FIELD* pField2 = &pFieldInfo2->pFields[i]; + + if (pField1->type != pField2->type || pField1->bytes != pField2->bytes || + strcasecmp(pField1->name, pField2->name) != 0) { + return 1; + } + } + + return 0; +} + int32_t tscGetResRowLength(SQueryInfo* pQueryInfo) { SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; if (pFieldInfo->numOfOutputCols <= 0) { @@ -1394,8 +1407,7 @@ void tscIncStreamExecutionCount(void* pStream) { ps->num += 1; } -bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId) { - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); +bool tscValidateColumnId(SMeterMetaInfo* pMeterMetaInfo, int32_t colId) { if (pMeterMetaInfo->pMeterMeta == NULL) { return false; } @@ -1592,7 +1604,7 @@ SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo* pQueryInfo, int32_t SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) { assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE); - + if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) { return NULL; } @@ -1600,19 +1612,19 @@ SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) { return pCmd->pQueryInfo[subClauseIndex]; } -int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) { +int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) { int32_t ret = TSDB_CODE_SUCCESS; - + *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); - + while ((*pQueryInfo) == NULL) { if ((ret = tscAddSubqueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { return ret; } - + (*pQueryInfo) = tscGetQueryInfoDetail(pCmd, subClauseIndex); } - + return TSDB_CODE_SUCCESS; } @@ -1654,20 +1666,20 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { tscTagCondRelease(&pQueryInfo->tagCond); tscClearFieldInfo(&pQueryInfo->fieldsInfo); - + tfree(pQueryInfo->exprsInfo.pExprs); memset(&pQueryInfo->exprsInfo, 0, sizeof(pQueryInfo->exprsInfo)); - + tscColumnBaseInfoDestroy(&pQueryInfo->colList); memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList)); - + pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); - + tfree(pQueryInfo->defaultVal); } void tscClearSubqueryInfo(SSqlCmd* pCmd) { - for(int32_t i = 0; i < pCmd->numOfClause; ++i) { + for (int32_t i = 0; i < pCmd->numOfClause; ++i) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); doClearSubqueryInfo(pQueryInfo); } @@ -1679,27 +1691,21 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) { } for (int32_t i = 0; i < pCmd->numOfClause; ++i) { - char *addr = (char *) pCmd - offsetof(SSqlObj, cmd); - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, i); - + char* addr = (char*)pCmd - offsetof(SSqlObj, cmd); + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); + doClearSubqueryInfo(pQueryInfo); - tscRemoveAllMeterMetaInfo(pQueryInfo, (const char *) addr, false); + tscRemoveAllMeterMetaInfo(pQueryInfo, (const char*)addr, false); tfree(pQueryInfo); } - + pCmd->numOfClause = 0; tfree(pCmd->pQueryInfo); } SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta, int16_t numOfTags, int16_t* tags) { -// while (pCmd->numOfClause <= subClauseIndex) { -// tscAddSubqueryInfo(pCmd); -// } - -// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - void* pAlloc = realloc(pQueryInfo->pMeterInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); if (pAlloc == NULL) { return NULL; @@ -1777,7 +1783,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) { SSqlCmd* pCmd = &pSql->cmd; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, tableIndex); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, tableIndex); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); if (pNew == NULL) { @@ -1804,6 +1810,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->cmd.pQueryInfo = NULL; pNew->cmd.numOfClause = 0; + pNew->cmd.clauseIndex = 0; if (tscAddSubqueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pNew); @@ -1811,25 +1818,25 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo)); memset(&pNewQueryInfo->colList, 0, sizeof(pNewQueryInfo->colList)); memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); - + pNewQueryInfo->pMeterInfo = NULL; pNewQueryInfo->defaultVal = NULL; pNewQueryInfo->numOfTables = 0; pNewQueryInfo->tsBuf = NULL; tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); - + if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); } - + if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); tscFreeSqlObj(pNew); @@ -1840,7 +1847,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void // set the correct query type if (pPrevSql != NULL) { - SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, 0); + SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex); pNewQueryInfo->type = pPrevQueryInfo->type; } else { pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery @@ -1863,14 +1870,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutputCols); free(indexList); - tscFieldInfoUpdateOffset(pNewQueryInfo); + tscFieldInfoUpdateOffsetForInterResult(pNewQueryInfo); } pNew->fp = fp; pNew->param = param; char key[TSDB_MAX_TAGS_LEN + 1] = {0}; - tscGetMetricMetaCacheKey(pCmd, 0, key, uid); + tscGetMetricMetaCacheKey(pCmd, pCmd->clauseIndex, key, uid); #ifdef _DEBUG_VIEW printf("the metricmeta key is:%s\n", key); @@ -1886,12 +1893,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); - } else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object. - SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0, 0); - - SMeterMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMeterMeta); - SMetricMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMetricMeta); - + } else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object. + SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); + + SMeterMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta); + SMetricMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta); + pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); } -- GitLab