From b93fbf7c9f05ee6eeded784f3c1e0e01d35845fc Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 27 Apr 2020 00:42:31 +0800 Subject: [PATCH] [td-186] --- src/client/inc/tscSubquery.h | 3 + src/client/inc/tscUtil.h | 9 +- src/client/inc/tsclient.h | 2 +- src/client/src/tscAsync.c | 24 +- src/client/src/tscLocal.c | 18 +- src/client/src/tscSQLParser.c | 50 +- src/client/src/tscSecondaryMerge.c | 11 +- src/client/src/tscServer.c | 22 +- src/client/src/tscSql.c | 243 +--------- src/client/src/tscSubquery.c | 709 +++++++++++++++++++---------- src/client/src/tscUtil.c | 105 +++-- src/common/src/tname.c | 9 +- src/inc/taosdef.h | 2 +- src/query/inc/queryLog.h | 4 +- src/query/src/queryExecutor.c | 26 +- src/util/inc/tarray.h | 4 +- src/util/src/tarray.c | 4 +- src/vnode/src/vnodeRead.c | 2 +- 18 files changed, 631 insertions(+), 616 deletions(-) diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index f8a6fbf5b1..1d4194d707 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -38,6 +38,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); +void tscBuildResFromSubqueries(SSqlObj *pSql); +void **doSetResultRowData(SSqlObj *pSql, bool finalResult); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index d46c32d73d..2181b718e2 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -57,8 +57,8 @@ typedef struct SJoinSubquerySupporter { int64_t interval; // interval time SLimitVal limit; // limit info uint64_t uid; // query meter uid - SArray* colList; // previous query information - SArray* exprsInfo; + SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution + SArray* exprList; SFieldInfo fieldsInfo; STagCond tagCond; SSqlGroupbyExpr groupbyExpr; @@ -159,7 +159,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); -SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy); +void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); void tscSqlExprInfoDestroy(SArray* pExprInfo); SColumn* tscColumnClone(const SColumn* src); @@ -203,7 +203,10 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); + void tscFreeQueryInfo(SSqlCmd* pCmd); +void tscInitQueryInfo(SQueryInfo* pQueryInfo); + void tscClearSubqueryInfo(SSqlCmd* pCmd); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e8278ea145..fead73d2a4 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -206,7 +206,7 @@ typedef struct SQueryInfo { SArray * colList; // SArray SFieldInfo fieldsInfo; - SArray * exprsInfo; // SArray + SArray * exprList; // SArray SLimitVal limit; SLimitVal slimit; STagCond tagCond; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index b954db0734..32fde5f7fa 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -14,17 +14,18 @@ */ #include "os.h" +#include "tutil.h" + +#include "tnote.h" #include "trpc.h" #include "tscLog.h" #include "tscProfile.h" +#include "tscSubquery.h" #include "tscSecondaryMerge.h" #include "tscUtil.h" -#include "tsclient.h" -#include "tsocket.h" -#include "tutil.h" -#include "tnote.h" #include "tsched.h" #include "tschemautil.h" +#include "tsclient.h" static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); @@ -219,12 +220,17 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi pSql->param = param; tscResetForNextRetrieve(pRes); - - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + + // handle the sub queries of join query + if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + tscFetchDatablockFromSubquery(pSql); + } else { + if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) { + pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + } + + tscProcessSql(pSql); } - - tscProcessSql(pSql); } void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) { diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 7ace940a1e..5cb4be6fa9 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -330,7 +330,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) { } int32_t totalNumOfResults = pMetricMeta->numOfTables; - int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo); + int32_t rowLen = tscGetResRowLength(pQueryInfo->exprList); tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen); @@ -370,7 +370,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) { #if 0 SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta; int32_t totalNumOfResults = 1; // count function only produce one result - int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo); + int32_t rowLen = tscGetResRowLength(pQueryInfo->exprList); tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen); @@ -408,7 +408,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) { return pSql->res.code; } - SSqlExpr *pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr *pExpr = taosArrayGetP(pQueryInfo->exprList, 0); if (pExpr->functionId == TSDB_FUNC_COUNT) { return tscBuildMetricTagSqlFunctionResult(pSql); } else { @@ -419,7 +419,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) { static void tscProcessCurrentUser(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); tscSetLocalQueryResult(pSql, pSql->pTscObj->user, pExpr->aliasName, TSDB_USER_LEN); } @@ -434,7 +434,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); tscSetLocalQueryResult(pSql, db, pExpr->aliasName, TSDB_DB_NAME_LEN); } @@ -442,14 +442,14 @@ static void tscProcessServerVer(SSqlObj *pSql) { const char* v = pSql->pTscObj->sversion; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); tscSetLocalQueryResult(pSql, v, pExpr->aliasName, tListLen(pSql->pTscObj->sversion)); } static void tscProcessClientVer(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); tscSetLocalQueryResult(pSql, version, pExpr->aliasName, strlen(version)); } @@ -469,7 +469,7 @@ static void tscProcessServStatus(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); tscSetLocalQueryResult(pSql, "1", pExpr->aliasName, 2); } @@ -491,7 +491,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0); - pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0); strncpy(pRes->data, val, pField->bytes); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4682e4306a..ccc0f85d67 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -105,7 +105,7 @@ static int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryI static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t optrToString(tSQLExpr* pExpr, char** exprString); -static int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); +static int32_t getTableIndexImpl(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql); static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate); @@ -1179,7 +1179,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel tExprNode* pNode = NULL; SArray* colList = taosArrayInit(10, sizeof(SColIndex)); - int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprsInfo, pQueryInfo, colList); + int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprList, pQueryInfo, colList); if (ret != TSDB_CODE_SUCCESS) { tExprTreeDestroy(&pNode, NULL); return invalidSqlErrMsg(pQueryInfo->msg, "invalid arithmetic expression in select clause"); @@ -1218,7 +1218,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel tExprNode* pNode = NULL; // SArray* colList = taosArrayInit(10, sizeof(SColIndex)); - int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprsInfo, pQueryInfo, NULL); + int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprList, pQueryInfo, NULL); if (ret != TSDB_CODE_SUCCESS) { tExprTreeDestroy(&pNode, NULL); return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause"); @@ -1548,7 +1548,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); } else { - // count the number of meters created according to the metric + // count the number of meters created according to the super table if (getColumnIndexByName(pToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); } @@ -2002,7 +2002,7 @@ int32_t doGetColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColum } } -int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { +int32_t getTableIndexImpl(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { if (pTableToken->n == 0) { // only one table and no table name prefix in column name if (pQueryInfo->numOfTables == 1) { pIndex->tableIndex = 0; @@ -2035,7 +2035,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIn SSQLToken tableToken = {0}; extractTableNameFromToken(pToken, &tableToken); - if (getMeterIndex(&tableToken, pQueryInfo, pIndex) != TSDB_CODE_SUCCESS) { + if (getTableIndexImpl(&tableToken, pQueryInfo, pIndex) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } @@ -3199,7 +3199,6 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum const char* msg3 = "join column must have same type"; const char* msg4 = "self join is not allowed"; const char* msg5 = "join table must be the same type(table to table, super table to super table)"; - const char* msg6 = "tags in join condition not support binary/nchar types"; tSQLExpr* pRight = pExpr->pRight; @@ -3234,9 +3233,6 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum } else if (pLeftIndex->tableIndex == rightIndex.tableIndex) { invalidSqlErrMsg(pQueryInfo->msg, msg4); return false; - } else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) { - invalidSqlErrMsg(pQueryInfo->msg, msg6); - return false; } // table to table/ super table to super table are allowed @@ -4044,7 +4040,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { } } - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); if ((pFillToken->nExpr < size) || ((pFillToken->nExpr - 1 < size) && (tscIsPointInterpQuery(pQueryInfo)))) { @@ -4079,7 +4075,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { pQueryInfo->order.orderColId = -1; } - /* for metric query, set default ascending order for group output */ + /* for super table query, set default ascending order for group output */ if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; } @@ -4127,7 +4123,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema SSQLToken columnName = {pVar->nLen, pVar->nType, pVar->pz}; SColumnIndex index = {0}; - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // metric query + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // super table query if (getColumnIndexByName(&columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -4460,7 +4456,7 @@ int32_t validateSqlFunctionInStreamSql(SQueryInfo* pQueryInfo) { return invalidSqlErrMsg(pQueryInfo->msg, msg0); } - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < size; ++i) { int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId; if (!IS_STREAM_QUERY_VALID(aAggs[functId].nStatus)) { @@ -4476,7 +4472,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SQueryInfo* pQueryInfo) { const char* msg1 = "column projection is not compatible with interval"; // multi-output set/ todo refactor - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); for (int32_t k = 0; k < size; ++k) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); @@ -4736,7 +4732,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* return invalidSqlErrMsg(pQueryInfo->msg, msg1); } - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); // filter the query functions operating on "tbname" column that are not supported by normal columns. for (int32_t i = 0; i < size; ++i) { @@ -4849,7 +4845,7 @@ void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t t if (pExpr == NULL || pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX || pExpr->functionId != functionId) { SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false); + pExpr = tscSqlExprInsert(pQueryInfo, 0, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false); pExpr->colInfo.flag = TSDB_COL_NORMAL; // NOTE: tag column does not add to source column list @@ -4864,7 +4860,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex); - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, size - 1); @@ -4933,7 +4929,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { int32_t tagLength = 0; - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); @@ -4960,7 +4956,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { } static void doUpdateSqlFunctionForColPrj(SQueryInfo* pQueryInfo) { - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); @@ -4998,7 +4994,7 @@ static bool onlyTagPrjFunction(SQueryInfo* pQueryInfo) { bool hasTagPrj = false; bool hasColumnPrj = false; - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_PRJ) { @@ -5033,7 +5029,7 @@ static bool allTagPrjInGroupby(SQueryInfo* pQueryInfo) { } static void updateTagPrjFunction(SQueryInfo* pQueryInfo) { - size_t size = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); @@ -5057,9 +5053,9 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) { int16_t numOfSelectivity = 0; int16_t numOfAggregation = 0; - size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprsInfo); + size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < numOfExprs; ++i) { - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, i); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, i); if (pExpr->functionId == TSDB_FUNC_TAGPRJ || (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)) { tagColExists = true; @@ -5068,7 +5064,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) { } for (int32_t i = 0; i < numOfExprs; ++i) { - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, i); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, i); int16_t functionId = pExpr->functionId; if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS || @@ -5275,7 +5271,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { return TSDB_CODE_INVALID_SQL; } - // projection query on metric does not compatible with "group by" syntax + // projection query on super table does not compatible with "group by" syntax if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -5491,7 +5487,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p pCmd->numOfCols = (int16_t)pFieldList->nField; - if (pTagList != NULL) { // create metric[optional] + if (pTagList != NULL) { // create super table[optional] for (int32_t i = 0; i < pTagList->nField; ++i) { tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &pTagList->p[i]); } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 84f14abf4c..5e91742678 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -282,7 +282,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16; pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage)); - int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprsInfo); + int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprList); pReducer->resColModel = finalmodel; pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength; assert(finalRowLength <= pReducer->rowSize); @@ -804,7 +804,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo // todo merge with following function // static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) { // -// for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { +// for (int32_t i = 0; i < pQueryInfo->exprList.numOfExprs; ++i) { // TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); // // int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i); @@ -901,7 +901,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, &pLocalReducer->interpolationInfo); } - int32_t rowSize = tscGetResRowLength(pQueryInfo->exprsInfo); + int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize); pFinalDataPage->numOfElems = 0; @@ -1423,8 +1423,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) { tscResetForNextRetrieve(pRes); if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed - tscTrace("%s call the drop local reducer", __FUNCTION__); - + tscTrace("%p %s call the drop local reducer", pSql, __FUNCTION__); tscDestroyLocalReducer(pSql); return 0; } @@ -1435,7 +1434,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) { // set the data merge in progress int32_t prevStatus = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS); - if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) { + if (prevStatus != TSC_LOCALREDUCE_READY) { assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ce941086c0..b1558862e1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -140,7 +140,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscTrace("heart beat failed, code:%d", code); + tscTrace("heart beat failed, code:%s", tstrerror(code)); } taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); @@ -326,11 +326,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { pRes->pRsp = NULL; } - // ignore the error information returned from mnode when set ignore flag in sql - if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) { - pRes->code = TSDB_CODE_SUCCESS; - } - /* * There is not response callback function for submit response. * The actual inserted number of points is the first number. @@ -427,8 +422,9 @@ int tscProcessSql(SSqlObj *pSql) { type = pQueryInfo->type; - // for heartbeat, numOfTables == 0; - assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); + // while numOfTables equals to 0, it must be Heartbeat + assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || + pQueryInfo->numOfTables > 0); } tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type); @@ -1474,12 +1470,10 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) { pRes->row = 0; uint8_t code = pRes->code; - if (pSql->fp) { // async retrieve metric data - if (pRes->code == TSDB_CODE_SUCCESS) { - (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); - } else { - tscQueueAsyncRes(pSql); - } + if (pRes->code == TSDB_CODE_SUCCESS) { + (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); + } else { + tscQueueAsyncRes(pSql); } return code; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index bcc231c3f2..12e90713ce 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -435,30 +435,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { return (pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows; } -static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) { - SSqlRes *pRes = &pSql->res; - - if (isNull(pRes->tsrow[columnIndex], pField->type)) { - pRes->tsrow[columnIndex] = NULL; - } else if (pField->type == TSDB_DATA_TYPE_NCHAR) { - // convert unicode to native code in a temporary buffer extra one byte for terminated symbol - if (pRes->buffer[columnIndex] == NULL) { - pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE); - } - - /* string terminated char for binary data*/ - memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE); - - if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) { - pRes->tsrow[columnIndex] = pRes->buffer[columnIndex]; - } else { - tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow); - pRes->tsrow[columnIndex] = NULL; - } - } -} - -static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { +static UNUSED_FUNC char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { // SArithmeticSupport *pSupport = (SArithmeticSupport *)param; // SArithExprInfo * pExpr = pSupport->pArithExpr; @@ -475,210 +452,6 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) return 0; } -static void **doSetResultRowData(SSqlObj *pSql) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); - - if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker - tfree(pRes->tsrow); - return pRes->tsrow; - } - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - //todo refactor move away - size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - for(int32_t k = 0; k < numOfExprs; ++k) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); - - if (k > 0) { - SSqlExpr* pPrev = tscSqlExprGet(pQueryInfo, k - 1); - pExpr->offset = pPrev->offset + pPrev->resBytes; - } - } - - int32_t num = 0; - for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i); - if (pInfo->pSqlExpr != NULL) { - pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i) + pInfo->pSqlExpr->resBytes * pRes->row; - } else { - assert(0); - } - - // primary key column cannot be null in interval query, no need to check - if (i == 0 && pQueryInfo->intervalTime > 0) { - continue; - } - - TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - transferNcharData(pSql, i, pField); - - // calculate the result from several other columns - if (pInfo->pArithExprInfo != NULL) { - SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport)); - sas->offset = 0; - sas->pArithExpr = pInfo->pArithExprInfo; - -// sas->numOfCols = sas->pArithExpr->numOfCols; - - if (pRes->buffer[i] == NULL) { - pRes->buffer[i] = malloc(tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->bytes); - } - - for(int32_t k = 0; k < sas->numOfCols; ++k) { -// int32_t columnIndex = sas->pArithExpr->colList[k].colIndex; -// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex); -// -// sas->elemSize[k] = pExpr->resBytes; -// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes; - } - - tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc); - pRes->tsrow[i] = pRes->buffer[i]; - - free(sas); //todo optimization - } - } - - assert(num <= pQueryInfo->fieldsInfo.numOfOutput); - - pRes->row++; // index increase one-step - return pRes->tsrow; -} - -static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { - bool hasData = true; - SSqlCmd *pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - bool allSubqueryExhausted = true; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] == NULL) { - continue; - } - -// SSqlRes *pRes1 = &pSql->pSubs[i]->res; - SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; - - SQueryInfo * pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex); -// STableMetaInfo *pMetaInfo = tscGetMetaInfo(pQueryInfo1, 0); - - assert(pQueryInfo1->numOfTables == 1); - - /* - * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are - * available, goes on - */ -// if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && -// (!tscHasReachLimitation(pQueryInfo1, pRes1))) { -// allSubqueryExhausted = false; -// break; -// } - } - - hasData = !allSubqueryExhausted; - } else { // otherwise, in case inner join, if any subquery exhausted, query completed. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] == 0) { - continue; - } - - SSqlRes * pRes1 = &pSql->pSubs[i]->res; - SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); - - if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) && - tscProjectionQueryOnTable(pQueryInfo1)) || - (pRes1->numOfRows == 0)) { - hasData = false; - break; - } - } - } - - return hasData; -} - -static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) { - SSqlRes *pRes = &pSql->res; - - while (1) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - - if (pRes->tsrow == NULL) { - pRes->tsrow = calloc(numOfExprs, POINTER_BYTES); - } - - bool success = false; - - int32_t numOfTableHasRes = 0; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] != 0) { - numOfTableHasRes++; - } - } - - if (numOfTableHasRes >= 2) { // do merge result - success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); - } else { // only one subquery - SSqlObj *pSub = pSql->pSubs[0]; - if (pSub == NULL) { - pSub = pSql->pSubs[1]; - } - - success = (doSetResultRowData(pSub) != NULL); - } - - if (success) { // current row of final output has been built, return to app - for (int32_t i = 0; i < numOfExprs; ++i) { - int32_t tableIndex = pRes->pColumnIndex[i].tableIndex; - int32_t columnIndex = pRes->pColumnIndex[i].columnIndex; - - SSqlRes *pRes1 = &pSql->pSubs[tableIndex]->res; - pRes->tsrow[i] = pRes1->tsrow[columnIndex]; - } - - pRes->numOfTotalInCurrentClause++; - - break; - } else { // continue retrieve data from vnode - if (!tscHashRemainDataInSubqueryResultSet(pSql)) { - tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); - SSubqueryState *pState = NULL; - - // free all sub sqlobj - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj *pChildObj = pSql->pSubs[i]; - if (pChildObj == NULL) { - continue; - } - - SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param; - pState = pSupporter->pState; - - tscDestroyJoinSupporter(pChildObj->param); - taos_free_result(pChildObj); - } - - free(pState); - return NULL; - } - - tscFetchDatablockFromSubquery(pSql); - if (pRes->code != TSDB_CODE_SUCCESS) { - return NULL; - } - } - } - - return pRes->tsrow; -} - static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; @@ -705,15 +478,19 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } // current data are exhausted, fetch more data - if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && - (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || - pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_DESCRIBE_TABLE))) { + if (pRes->row >= pRes->numOfRows && pRes->completed != true && + (pCmd->command == TSDB_SQL_RETRIEVE || + pCmd->command == TSDB_SQL_RETRIEVE_METRIC || + pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE || + pCmd->command == TSDB_SQL_FETCH || + pCmd->command == TSDB_SQL_SHOW || + pCmd->command == TSDB_SQL_SELECT || + pCmd->command == TSDB_SQL_DESCRIBE_TABLE)) { taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); - sem_wait(&pSql->rspSem); } - return doSetResultRowData(pSql); + return doSetResultRowData(pSql, true); } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 856c28eaff..729c0df3c1 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -24,7 +24,8 @@ typedef struct SInsertSupporter { SSqlObj* pSql; } SInsertSupporter; -static void freeSubqueryObj(SSqlObj* pSql); +static void freeJoinSubqueryObj(SSqlObj* pSql); +static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql); static bool doCompare(int32_t order, int64_t left, int64_t right) { if (order == TSDB_ORDER_ASC) { @@ -172,7 +173,6 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index); pSupporter->uid = pTableMetaInfo->pTableMeta->uid; - assert (pSupporter->uid != 0); getTmpfilePath("join-", pSupporter->path); @@ -190,14 +190,20 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) { return; } - tscSqlExprInfoDestroy(pSupporter->exprsInfo); - tscColumnListDestroy(pSupporter->colList); + if (pSupporter->exprList != NULL) { + tscSqlExprInfoDestroy(pSupporter->exprList); + } + + if (pSupporter->colList != NULL) { + tscColumnListDestroy(pSupporter->colList); + } tscFieldInfoClear(&pSupporter->fieldsInfo); if (pSupporter->f != NULL) { fclose(pSupporter->f); unlink(pSupporter->path); + pSupporter->f = NULL; } tscTagCondRelease(&pSupporter->tagCond); @@ -238,7 +244,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) { pSupporter = pSql->pSubs[i]->param; - if (taosArrayGetSize(pSupporter->exprsInfo) > 0) { + if (taosArrayGetSize(pSupporter->exprList) > 0) { ++numOfSub; } } @@ -249,9 +255,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " "select clause", pSql, pSql->numOfSubs, numOfSub); - /* - * the subqueries that do not actually launch the secondary query to virtual node is set as completed. - */ + //the subqueries that do not actually launch the secondary query to virtual node is set as completed. pState = pSupporter->pState; pState->numOfTotal = pSql->numOfSubs; pState->numOfCompleted = (pSql->numOfSubs - numOfSub); @@ -264,7 +268,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { pSupporter = pPrevSub->param; - if (taosArrayGetSize(pSupporter->exprsInfo) == 0) { + if (taosArrayGetSize(pSupporter->exprList) == 0) { tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i); tscDestroyJoinSupporter(pSupporter); @@ -300,23 +304,25 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { pQueryInfo->intervalTime = pSupporter->interval; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; - - tscColumnListCopy(pQueryInfo->colList, pSupporter->colList, 0); + tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); - pQueryInfo->exprsInfo = tscSqlExprCopy(pSupporter->exprsInfo, pSupporter->uid, false); - tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); + pQueryInfo->colList = pSupporter->colList; + pQueryInfo->exprList = pSupporter->exprList; + pQueryInfo->fieldsInfo = pSupporter->fieldsInfo; - pSupporter->fieldsInfo.numOfOutput = 0; + pSupporter->exprList = NULL; + pSupporter->colList = NULL; + memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo)); /* * if the first column of the secondary query is not ts function, add this function. * Because this column is required to filter with timestamp after intersecting. */ - SSqlExpr* pExpr = taosArrayGet(pSupporter->exprsInfo, 0); - if (pExpr->functionId != TSDB_FUNC_TS) { - tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); - } +// SSqlExpr* pExpr = taosArrayGet(pQueryInfo->exprList, 0); +// if (pExpr->functionId != TSDB_FUNC_TS) { +// tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); +// } SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); @@ -334,7 +340,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { // fetch the join tag column if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); + SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0); assert(pQueryInfo->tagCond.joinInfo.hasJoin); int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); @@ -347,7 +353,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, - taosArrayGetSize(pNewQueryInfo->exprsInfo), numOfCols, + taosArrayGetSize(pNewQueryInfo->exprList), numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); } @@ -356,7 +362,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, pSql->numOfSubs, pSql->res.code); - freeSubqueryObj(pSql); + freeJoinSubqueryObj(pSql); return pSql->res.code; } @@ -373,19 +379,22 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { return TSDB_CODE_SUCCESS; } -void freeSubqueryObj(SSqlObj* pSql) { +void freeJoinSubqueryObj(SSqlObj* pSql) { SSubqueryState* pState = NULL; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] != NULL) { - SJoinSubquerySupporter* p = pSql->pSubs[i]->param; - pState = p->pState; + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + SJoinSubquerySupporter* p = pSub->param; + pState = p->pState; - tscDestroyJoinSupporter(p); + tscDestroyJoinSupporter(p); - if (pSql->pSubs[i]->res.code == TSDB_CODE_SUCCESS) { - taos_free_result(pSql->pSubs[i]); - } + if (pSub->res.code == TSDB_CODE_SUCCESS) { + taos_free_result(pSub); } } @@ -393,15 +402,6 @@ void freeSubqueryObj(SSqlObj* pSql) { pSql->numOfSubs = 0; } -static void doQuitSubquery(SSqlObj* pParentSql) { - freeSubqueryObj(pParentSql); - -// tsem_wait(&pParentSql->emptyRspSem); -// tsem_wait(&pParentSql->emptyRspSem); - -// tsem_post(&pParentSql->rspSem); -} - static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { int32_t numOfTotal = pSupporter->pState->numOfTotal; int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); @@ -410,7 +410,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter pSqlObj->res.code = abs(pSupporter->pState->code); tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); - doQuitSubquery(pSqlObj); + freeJoinSubqueryObj(pSqlObj); } } @@ -422,17 +422,74 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) pQueryInfo->window.ekey = et; } +static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter, SSqlObj* pSql) { + SSqlObj* pParentSql = pSupporter->pObj; + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); + + if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pQueryInfo->numOfTables == 1); + + // for projection query, need to try next vnode +// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; + int32_t totalVnode = 0; + if ((++pTableMetaInfo->vgroupIndex) < totalVnode) { + tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, + pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal); + + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + assert(finished == numOfTotal); + + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { + tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, pSql, + pSupporter->subqueryIndex); + freeJoinSubqueryObj(pParentSql); + return; + } + + tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql); + + SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param; + SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param; + + TSKEY st, et; + + int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et); + if (num <= 0) { // no result during ts intersect + tscTrace("%p free all sub SqlObj and quit", pParentSql); + freeJoinSubqueryObj(pParentSql); + } else { + updateQueryTimeRange(pParentQueryInfo, st, et); + tscLaunchSecondPhaseSubqueries(pParentSql); + } + } +} + static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; - + + SSqlObj* pParentSql = pSupporter->pObj; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == 0) { + if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pSupporter->pState->code); @@ -440,101 +497,55 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { quitAllSubquery(pParentSql, pSupporter); return; } + + if (numOfRows < 0) { + tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); + pSupporter->pState->code = numOfRows; + quitAllSubquery(pParentSql, pSupporter); + return; + } else if (numOfRows == 0) { + tSIntersectionAndLaunchSecQuery(pSupporter, pSql); + return; + } - if (numOfRows > 0) { // write the data into disk - fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); - fclose(pSupporter->f); - - STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); - if (pBuf == NULL) { - tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows); - - pSupporter->pState->code = TSDB_CODE_APP_ERROR; // todo set the informative code - quitAllSubquery(pParentSql, pSupporter); - return; - } + // write the compressed timestamp to disk file + fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); + fclose(pSupporter->f); + pSupporter->f = NULL; + + STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); + if (pBuf == NULL) { + tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); - if (pSupporter->pTSBuf == NULL) { - tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path); - pSupporter->pTSBuf = pBuf; - } else { - assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + pSupporter->pState->code = TSDB_CODE_APP_ERROR; // todo set the informative code + quitAllSubquery(pParentSql, pSupporter); + return; + } - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); - tsBufDestory(pBuf); - } + if (pSupporter->pTSBuf == NULL) { + tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows); + pSupporter->pTSBuf = pBuf; + } else { + assert(pQueryInfo->numOfTables == 1); // for subquery, only one + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - // open new file to save the result + tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); + tsBufDestory(pBuf); + } + + if (pSql->res.completed) { + tSIntersectionAndLaunchSecQuery(pSupporter, pSql); + } else { // open a new file to save the incoming result getTmpfilePath("ts-join", pSupporter->path); pSupporter->f = fopen(pSupporter->path, "w"); pSql->res.row = pSql->res.numOfRows; taos_fetch_rows_a(tres, joinRetrieveCallback, param); - } else if (numOfRows == 0) { // no data from this vnode anymore - SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); - - //todo refactor - if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1); - - // for projection query, need to try next vnode -// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - int32_t totalVnode = 0; - if ((++pTableMetaInfo->vgroupIndex) < totalVnode) { - tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal); - - pSql->cmd.command = TSDB_SQL_SELECT; - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); - - return; - } - } - - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - if (finished >= numOfTotal) { - assert(finished == numOfTotal); - - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, - pSupporter->subqueryIndex); - doQuitSubquery(pParentSql); - return; - } - - tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql); - - SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param; - SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param; - - TSKEY st, et; - - int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et); - if (num <= 0) { // no result during ts intersect - tscTrace("%p free all sub SqlObj and quit", pParentSql); - doQuitSubquery(pParentSql); - } else { - updateQueryTimeRange(pParentQueryInfo, st, et); - tscLaunchSecondPhaseSubqueries(pParentSql); - } - } - } else { // failure of sub query - tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); - pSupporter->pState->code = numOfRows; - - quitAllSubquery(pParentSql, pSupporter); - return; } - } else { // secondary stage retrieve, driven by taos_fetch_row or other functions if (numOfRows < 0) { pSupporter->pState->code = numOfRows; - tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); + tscError("%p retrieve failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); } if (numOfRows >= 0) { @@ -542,20 +553,20 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { } if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { -// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); - - // for projection query, need to try next vnode if current vnode is exhausted -// if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { -// pSupporter->pState->numOfCompleted = 0; -// pSupporter->pState->numOfTotal = 1; -// -// pSql->cmd.command = TSDB_SQL_SELECT; -// pSql->fp = tscJoinQueryCallback; -// tscProcessSql(pSql); -// -// return; -// } + + // for projection query, need to try next vnode if current vnode is exhausted + if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { + pSupporter->pState->numOfCompleted = 0; + pSupporter->pState->numOfTotal = 1; + + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } } int32_t numOfTotal = pSupporter->pState->numOfTotal; @@ -567,11 +578,23 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { pParentSql->res.code); if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - pParentSql->res.code = abs(pSupporter->pState->code); - freeSubqueryObj(pParentSql); + pParentSql->res.code = pSupporter->pState->code; + freeJoinSubqueryObj(pParentSql); + pParentSql->res.completed = true; } - -// tsem_post(&pParentSql->rspSem); + + // update the records for each subquery in parent sql object. + for(int32_t i = 0; i < pParentSql->numOfSubs; ++i) { + if (pParentSql->pSubs[i] == NULL) { + continue; + } + + SSqlRes* pRes1 = &pParentSql->pSubs[i]->res; + pRes1->numOfTotalInCurrentClause += pRes1->numOfRows; + } + + // data has retrieved to client, build the join results + tscBuildResFromSubqueries(pParentSql); } else { tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); } @@ -599,38 +622,61 @@ static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t nu } void tscFetchDatablockFromSubquery(SSqlObj* pSql) { - int32_t numOfFetch = 0; assert(pSql->numOfSubs >= 1); + int32_t numOfFetch = 0; + bool hasData = true; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] == NULL) { // this subquery does not need to involve in secondary query + // if the subquery is NULL, it does not involved in the final result generation + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { continue; } - SSqlRes *pRes = &pSql->pSubs[i]->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); - -// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + SSqlRes *pRes = &pSub->res; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && -// (!tscHasReachLimitation(pQueryInfo, pRes))) { -// numOfFetch++; -// } - } else { - if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) { + if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && + (!tscHasReachLimitation(pQueryInfo, pRes)) && !pRes->completed) { numOfFetch++; } + } else { + if (!tscHasReachLimitation(pQueryInfo, pRes)) { + if (pRes->row >= pRes->numOfRows) { + hasData = false; + + if (!pRes->completed) { + numOfFetch++; + } + } + } else { // has reach the limitation, no data anymore + hasData = false; + } + } } - if (numOfFetch <= 0) { + // has data remains in client side, and continue to return data to app + if (hasData) { + tscBuildResFromSubqueries(pSql); + return; + } else if (numOfFetch <= 0) { + pSql->res.completed = true; + freeJoinSubqueryObj(pSql); + + if (pSql->res.code == TSDB_CODE_SUCCESS) { + (*pSql->fp)(pSql->param, pSql, 0); + } else { + tscQueueAsyncRes(pSql); + } + return; } // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); - SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); for (int32_t i = 0; i < pSql->numOfSubs; ++i) { @@ -664,19 +710,6 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { tscProcessSql(pSql1); } } - - // wait for all subquery completed -// tsem_wait(&pSql->rspSem); - - // update the records for each subquery - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] == NULL) { - continue; - } - - SSqlRes* pRes1 = &pSql->pSubs[i]->res; - pRes1->numOfTotalInCurrentClause += pRes1->numOfRows; - } } // all subqueries return, set the result output index @@ -710,7 +743,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd; SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0); - size_t numOfExprs = taosArrayGetSize(pSubQueryInfo->exprsInfo); + size_t numOfExprs = taosArrayGetSize(pSubQueryInfo->exprList); for (int32_t k = 0; k < numOfExprs; ++k) { SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k); if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) { @@ -723,35 +756,17 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = (SSqlObj*)tres; - // STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - - // int32_t idx = pSql->cmd.vnodeIdx; - + SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; - - // if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >= - // pSupporter->numOfTotal) { - // SSqlObj *pParentObj = pSupporter->pObj; - // - // if ((pSql->cmd.type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != 1) { - // int32_t num = 0; - // tscFetchDatablockFromSubquery(pParentObj); - // TSKEY* ts = tscGetQualifiedTSList(pParentObj, &num); - // - // if (num <= 0) { - // // no qualified result - // } - // - // tscLaunchSecondPhaseSubqueries(pSql, ts, num); - // } else { - - // } - // } else { + + // There is only one subquery and table for each subquery. SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1); + if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code joinRetrieveCallback(param, pSql, code); - } else { // first stage query, continue to retrieve data + } else { // first stage query, continue to retrieve compressed time stamp data pSql->fp = joinRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; tscProcessSql(pSql); @@ -789,24 +804,17 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * data instead of returning to its invoker */ if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// assert(pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value pSql->fp = joinRetrieveCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query - if (pParentSql->fp == NULL) { -// tsem_post(&pParentSql->rspSem); + // set the command flag must be after the semaphore been correctly set. + if (pParentSql->res.code == TSDB_CODE_SUCCESS) { + (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { - // set the command flag must be after the semaphore been correctly set. - // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - // if (pPObj->res.code == TSDB_CODE_SUCCESS) { - // (*pPObj->fp)(pPObj->param, pPObj, 0); - // } else { - // tscQueueAsyncRes(pPObj); - // } - assert(0); + tscQueueAsyncRes(pParentSql); } } } @@ -856,12 +864,19 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pCol->colIndex.tableIndex = 0; } - tscColumnListCopy(pSupporter->colList, pNewQueryInfo->colList, 0); + pSupporter->colList = pNewQueryInfo->colList; + pNewQueryInfo->colList = NULL; + + pSupporter->exprList = pNewQueryInfo->exprList; + pNewQueryInfo->exprList = NULL; + + pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo; - pSupporter->exprsInfo = tscSqlExprCopy(pNewQueryInfo->exprsInfo, pSupporter->uid, false); - tscFieldInfoCopy(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); + // this data needs to be transfer to support struct + memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); - tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond); + pSupporter->tagCond = pNewQueryInfo->tagCond; + memset(&pNewQueryInfo->tagCond, 0, sizeof(STagCond)); pNew->cmd.numOfCols = 0; pNewQueryInfo->intervalTime = 0; @@ -871,15 +886,10 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr; memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); - // this data needs to be transfer to support struct - pNewQueryInfo->fieldsInfo.numOfOutput = 0; - - // set the ts,tags that involved in join, as the output column of intermediate result - tscClearSubqueryInfo(&pNew->cmd); - - SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; + SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscInitQueryInfo(pNewQueryInfo); tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); // set the tags value for ts_comp function @@ -892,14 +902,16 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pExpr->numOfParams = 1; // add the filter tag column - size_t s = taosArrayGetSize(pSupporter->colList); + if (pSupporter->colList != NULL) { + size_t s = taosArrayGetSize(pSupporter->colList); + + for (int32_t i = 0; i < s; ++i) { + SColumn *pCol = taosArrayGetP(pSupporter->colList, i); - for (int32_t i = 0; i < s; ++i) { - SColumn *pCol = taosArrayGetP(pSupporter->colList, i); - - if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. - SColumn* p = tscColumnClone(pCol); - taosArrayPush(pNewQueryInfo->colList, &p); + if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + SColumn* p = tscColumnClone(pCol); + taosArrayPush(pNewQueryInfo->colList, &p); + } } } @@ -910,25 +922,19 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); - tscPrintSelectClause(pNew, 0); - } else { + assert(0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; } -#ifdef _DEBUG_VIEW tscPrintSelectClause(pNew, 0); -#endif - return tscProcessSql(pNew); } -// todo support async join query int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); @@ -955,13 +961,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { } } -// tsem_wait(&pSql->rspSem); - - if (pSql->numOfSubs <= 0) { - pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; - } else { - pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE; - } + pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_METRIC_JOIN_RETRIEVE; return TSDB_CODE_SUCCESS; } @@ -1051,7 +1051,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pParentSqlObj = pSql; trs->pFinalColModel = pModel; - pthread_mutexattr_t mutexattr = {0}; + pthread_mutexattr_t mutexattr; + memset(&mutexattr, 0, sizeof(pthread_mutexattr_t)); + pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&trs->queryMutex, &mutexattr); pthread_mutexattr_destroy(&mutexattr); @@ -1179,8 +1181,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO trsupport->localBuffer->numOfElems = 0; pthread_mutex_unlock(&trsupport->queryMutex); - tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows, - subqueryIndex, trsupport->numOfRetry); + tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, + tstrerror(numOfRows), subqueryIndex, trsupport->numOfRetry); SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); if (pNew == NULL) { @@ -1576,3 +1578,230 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } + +void tscBuildResFromSubqueries(SSqlObj *pSql) { + SSqlRes *pRes = &pSql->res; + + if (pRes->code != TSDB_CODE_SUCCESS) { + tscQueueAsyncRes(pSql); + return; + } + + while (1) { + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + + if (pRes->tsrow == NULL) { + pRes->tsrow = calloc(numOfExprs, POINTER_BYTES); + } + + bool success = false; + + int32_t numOfTableHasRes = 0; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] != 0) { + numOfTableHasRes++; + } + } + + if (numOfTableHasRes >= 2) { // do merge result + success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) && (doSetResultRowData(pSql->pSubs[1], false) != NULL); + } else { // only one subquery + SSqlObj *pSub = pSql->pSubs[0]; + if (pSub == NULL) { + pSub = pSql->pSubs[1]; + } + + success = (doSetResultRowData(pSub, false) != NULL); + } + + if (success) { // current row of final output has been built, return to app + for (int32_t i = 0; i < numOfExprs; ++i) { + SColumnIndex* pIndex = &pRes->pColumnIndex[i]; + SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; + pRes->tsrow[i] = pRes1->tsrow[pIndex->columnIndex]; + } + + pRes->numOfTotalInCurrentClause++; + break; + } else { // continue retrieve data from vnode + if (!tscHashRemainDataInSubqueryResultSet(pSql)) { + tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); + SSubqueryState *pState = NULL; + + // free all sub sqlobj + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj *pChildObj = pSql->pSubs[i]; + if (pChildObj == NULL) { + continue; + } + + SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param; + pState = pSupporter->pState; + + tscDestroyJoinSupporter(pChildObj->param); + taos_free_result(pChildObj); + } + + free(pState); + return; + } + + tscFetchDatablockFromSubquery(pSql); + if (pRes->code != TSDB_CODE_SUCCESS) { + return; + } + } + } + + if (pSql->res.code == TSDB_CODE_SUCCESS) { + (*pSql->fp)(pSql->param, pSql, 0); + } else { + tscQueueAsyncRes(pSql); + } +} + +static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) { + SSqlRes *pRes = &pSql->res; + + if (isNull(pRes->tsrow[columnIndex], pField->type)) { + pRes->tsrow[columnIndex] = NULL; + } else if (pField->type == TSDB_DATA_TYPE_NCHAR) { + // convert unicode to native code in a temporary buffer extra one byte for terminated symbol + if (pRes->buffer[columnIndex] == NULL) { + pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE); + } + + /* string terminated char for binary data*/ + memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE); + + if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) { + pRes->tsrow[columnIndex] = pRes->buffer[columnIndex]; + } else { + tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow); + pRes->tsrow[columnIndex] = NULL; + } + } +} + +void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); + + if(pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + if (pRes->completed) { + tfree(pRes->tsrow); + } + + return pRes->tsrow; + } + + if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker + tfree(pRes->tsrow); + return pRes->tsrow; + } + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { + SFieldSupInfo* pSup = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i); + if (pSup->pSqlExpr != NULL) { + pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row; + } + + // primary key column cannot be null in interval query, no need to check + if (i == 0 && pQueryInfo->intervalTime > 0) { + continue; + } + + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); + transferNcharData(pSql, i, pField); + + // calculate the result from several other columns + if (pSup->pArithExprInfo != NULL) { +// SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport)); +// sas->offset = 0; +// sas-> = pQueryInfo->fieldsInfo.pExpr[i]; +// +// sas->numOfCols = sas->pExpr->binExprInfo.numOfCols; +// +// if (pRes->buffer[i] == NULL) { +// pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes); +// } +// +// for(int32_t k = 0; k < sas->numOfCols; ++k) { +// int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIdxInBuf; +// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex); +// +// sas->elemSize[k] = pExpr->resBytes; +// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes; +// } +// +// tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc); +// pRes->tsrow[i] = pRes->buffer[i]; +// +// free(sas); //todo optimization + } + } + + pRes->row++; // index increase one-step + return pRes->tsrow; +} + +static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { + bool hasData = true; + SSqlCmd *pCmd = &pSql->cmd; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + bool allSubqueryExhausted = true; + + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { + continue; + } + + SSqlRes *pRes1 = &pSql->pSubs[i]->res; + SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; + + SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex); + assert(pQueryInfo1->numOfTables == 1); + + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0); + + /* + * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are + * available, goes on + */ + if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows && + (!tscHasReachLimitation(pQueryInfo1, pRes1))) { + allSubqueryExhausted = false; + break; + } + } + + hasData = !allSubqueryExhausted; + } else { // otherwise, in case inner join, if any subquery exhausted, query completed. + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == 0) { + continue; + } + + SSqlRes * pRes1 = &pSql->pSubs[i]->res; + SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); + + if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) && + tscProjectionQueryOnTable(pQueryInfo1)) || + (pRes1->numOfRows == 0)) { + hasData = false; + break; + } + } + } + + return hasData; +} + + diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6b8b2b38b4..477c432130 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -844,12 +844,12 @@ SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FI void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) { size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->offset = 0; for (int32_t i = 1; i < numOfExprs; ++i) { - SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprsInfo, i - 1); - SSqlExpr* p = taosArrayGetP(pQueryInfo->exprsInfo, i); + SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1); + SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i); p->offset = prev->offset + prev->resBytes; } @@ -860,13 +860,13 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { return; } - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->offset = 0; size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 1; i < numOfExprs; ++i) { - SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprsInfo, i - 1); - SSqlExpr* p = taosArrayGetP(pQueryInfo->exprsInfo, i); + SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1); + SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i); p->offset = prev->offset + prev->resBytes; } @@ -875,8 +875,17 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) { dst->numOfOutput = src->numOfOutput; - taosArrayCopy(dst->pFields, src->pFields); - taosArrayCopy(dst->pSupportInfo, src->pSupportInfo); + if (dst->pFields == NULL) { + dst->pFields = taosArrayClone(src->pFields); + } else { + taosArrayCopy(dst->pFields, src->pFields); + } + + if (dst->pSupportInfo == NULL) { + dst->pSupportInfo = taosArrayClone(src->pSupportInfo); + } else { + taosArrayCopy(dst->pSupportInfo, src->pSupportInfo); + } } TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) { @@ -983,20 +992,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, int16_t size, int16_t interSize, bool isTagCol) { - int32_t num = taosArrayGetSize(pQueryInfo->exprsInfo); + int32_t num = taosArrayGetSize(pQueryInfo->exprList); if (index == num) { return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); } SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); - taosArrayInsert(pQueryInfo->exprsInfo, index, &pExpr); + taosArrayInsert(pQueryInfo->exprList, index, &pExpr); return pExpr; } SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, int16_t size, int16_t interSize, bool isTagCol) { SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); - taosArrayPush(pQueryInfo->exprsInfo, &pExpr); + taosArrayPush(pQueryInfo->exprList, &pExpr); return pExpr; } @@ -1020,7 +1029,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi } int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) { - return taosArrayGetSize(pQueryInfo->exprsInfo); + return taosArrayGetSize(pQueryInfo->exprList); } void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) { @@ -1037,7 +1046,7 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, } SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index) { - return taosArrayGetP(pQueryInfo->exprsInfo, index); + return taosArrayGetP(pQueryInfo->exprList, index); } void* sqlExprDestroy(SSqlExpr* pExpr) { @@ -1068,14 +1077,10 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo) { taosArrayDestroy(pExprInfo); } -SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy) { - if (src == NULL || taosArrayGetSize(src) == 0) { - return taosArrayInit(1, POINTER_BYTES); - } +void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) { + assert(src != NULL && dst != NULL); size_t size = taosArrayGetSize(src); - SArray* dst = taosArrayInit(size, POINTER_BYTES); - for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = taosArrayGetP(src, i); @@ -1095,8 +1100,6 @@ SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy) { } } } - - return dst; } SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { @@ -1194,9 +1197,7 @@ static void tscColumnDestroy(SColumn* pCol) { } void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) { - if (src == NULL) { - return; - } + assert(src != NULL && dst != NULL); size_t num = taosArrayGetSize(src); for (int32_t i = 0; i < num; ++i) { @@ -1577,9 +1578,22 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i return tscGetMetaInfo(pQueryInfo, k); } +void tscInitQueryInfo(SQueryInfo* pQueryInfo) { + assert(pQueryInfo->fieldsInfo.pFields == NULL); + pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD)); + + assert(pQueryInfo->fieldsInfo.pSupportInfo == NULL); + pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo)); + + assert(pQueryInfo->exprList == NULL); + pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); +} + int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { assert(pCmd != NULL); + // todo refactor: remove this structure size_t s = pCmd->numOfClause + 1; char* tmp = realloc(pCmd->pQueryInfo, s * POINTER_BYTES); if (tmp == NULL) { @@ -1589,12 +1603,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { pCmd->pQueryInfo = (SQueryInfo**)tmp; SQueryInfo* pQueryInfo = calloc(1, sizeof(SQueryInfo)); - - // todo refactor to extract functions. - pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD)); - pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo)); - - pQueryInfo->exprsInfo = taosArrayInit(4, POINTER_BYTES); + tscInitQueryInfo(pQueryInfo); pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer @@ -1606,11 +1615,11 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { tscTagCondRelease(&pQueryInfo->tagCond); tscFieldInfoClear(&pQueryInfo->fieldsInfo); - tscSqlExprInfoDestroy(pQueryInfo->exprsInfo); - memset(&pQueryInfo->exprsInfo, 0, sizeof(pQueryInfo->exprsInfo)); + tscSqlExprInfoDestroy(pQueryInfo->exprList); + pQueryInfo->exprList = NULL; tscColumnListDestroy(pQueryInfo->colList); - memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList)); + pQueryInfo->colList = NULL; if (pQueryInfo->groupbyExpr.columnInfo != NULL) { taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo); @@ -1783,20 +1792,21 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo)); - - memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); - + pNewQueryInfo->command = pQueryInfo->command; + pNewQueryInfo->type = pQueryInfo->type; + pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit; + pNewQueryInfo->window = pQueryInfo->window; + pNewQueryInfo->intervalTime = pQueryInfo->intervalTime; + pNewQueryInfo->slidingTime = pQueryInfo->slidingTime; + pNewQueryInfo->limit = pQueryInfo->limit; + pNewQueryInfo->slimit = pQueryInfo->slimit; + pNewQueryInfo->order = pQueryInfo->order; + pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->defaultVal = NULL; pNewQueryInfo->numOfTables = 0; pNewQueryInfo->tsBuf = NULL; - pNewQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); - pNewQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD)); - pNewQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo)); - pNewQueryInfo->exprsInfo = taosArrayInit(4, POINTER_BYTES); - tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { @@ -1821,7 +1831,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } uint64_t uid = pTableMetaInfo->pTableMeta->uid; - pNewQueryInfo->exprsInfo = tscSqlExprCopy(pQueryInfo->exprsInfo, uid, true); + tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true); int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo); @@ -1841,20 +1851,21 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } } - // make sure the the sqlExpr for each fields is correct + // make sure the the sqlExpr for each fields is correct // todo handle the agg arithmetic expression for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f); for(int32_t k1 = 0; k1 < numOfExprs; ++k1) { SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1); - if (strcmp(field->name, pExpr1->aliasName) == 0) { + + if (strcmp(field->name, pExpr1->aliasName) == 0) { // eatablish link according to the result field name SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f); pInfo->pSqlExpr = pExpr1; } } } - - tscFieldInfoUpdateOffsetForInterResult(pNewQueryInfo); + + tscFieldInfoUpdateOffset(pNewQueryInfo); } pNew->fp = fp; diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 822d612243..29236ed0ff 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -25,13 +25,10 @@ __attribute__((unused)) static FORCE_INLINE size_t copy(char* dst, const char* s } void extractTableName(const char* tableId, char* name) { - size_t offset = strcspn(tableId, &TS_PATH_DELIMITER[0]); - offset = strcspn(&tableId[offset], &TS_PATH_DELIMITER[0]); + size_t s1 = strcspn(tableId, &TS_PATH_DELIMITER[0]); + size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]); - strncpy(name, &tableId[offset], TSDB_TABLE_NAME_LEN); - -// char* r = skipSegments(tableId, TS_PATH_DELIMITER[0], 2); -// return copy(name, r, TS_PATH_DELIMITER[0]); + strncpy(name, &tableId[s1 + s2 + 2], TSDB_TABLE_NAME_LEN); } char* extractDBName(const char* tableId, char* name) { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 043e58cb35..89a3cbb3a9 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -199,7 +199,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_SHELL_VNODE_BITS 24 #define TSDB_SHELL_SID_MASK 0xFF #define TSDB_HTTP_TOKEN_LEN 20 -#define TSDB_SHOW_SQL_LEN 32 +#define TSDB_SHOW_SQL_LEN 512 #define TSDB_METER_STATE_OFFLINE 0 #define TSDB_METER_STATE_ONLLINE 1 diff --git a/src/query/inc/queryLog.h b/src/query/inc/queryLog.h index 929e079d1e..2c77ee6032 100644 --- a/src/query/inc/queryLog.h +++ b/src/query/inc/queryLog.h @@ -31,11 +31,11 @@ extern int32_t qdebugFlag; #define qError(...) \ if (qdebugFlag & DEBUG_ERROR) { \ - taosPrintLog("ERROR RPC ", qdebugFlag, __VA_ARGS__); \ + taosPrintLog("ERROR QRY ", qdebugFlag, __VA_ARGS__); \ } #define qWarn(...) \ if (qdebugFlag & DEBUG_WARN) { \ - taosPrintLog("WARN RPC ", qdebugFlag, __VA_ARGS__); \ + taosPrintLog("WARN QRY ", qdebugFlag, __VA_ARGS__); \ } #ifdef __cplusplus diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 2143742e24..04188b45cc 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -5112,7 +5112,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { pQInfo->pointsInterpo += numOfInterpo; qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); - sem_post(&pQInfo->dataReady); return; } @@ -5133,7 +5132,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { if (pQuery->rec.rows > 0) { qTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); - sem_post(&pQInfo->dataReady); return; } } @@ -5141,7 +5139,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { qTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total); // vnodePrintQueryStatistics(pSupporter); - sem_post(&pQInfo->dataReady); return; } @@ -5166,14 +5163,12 @@ static void tableQueryImpl(SQInfo *pQInfo) { /* check if query is killed or not */ if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p query is killed", pQInfo); - } else { - // STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0); - // qTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " - // rows", - // pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); + } else {// todo set the table uid and tid in log +// SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); +// SPair* pair = taosArrayGet(p, 0); + qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", + pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } - - sem_post(&pQInfo->dataReady); } static void stableQueryImpl(SQInfo *pQInfo) { @@ -5201,8 +5196,6 @@ static void stableQueryImpl(SQInfo *pQInfo) { pQuery->rec.total); // vnodePrintQueryStatistics(pSupporter); } - - sem_post(&pQInfo->dataReady); } static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { @@ -6017,9 +6010,15 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { unlink(pQuery->sdata[0]->data); } else { + // todo return the error code to client qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); } + + // all data returned, set query over + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + setQueryStatus(pQuery, QUERY_OVER); + } } else { doCopyQueryResultToMsg(pQInfo, pQuery->rec.rows, data); } @@ -6143,7 +6142,8 @@ void qTableQuery(qinfo_t qinfo) { } else { tableQueryImpl(pQInfo); } - + + sem_post(&pQInfo->dataReady); // vnodeDecRefCount(pQInfo); } diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index dbb74edf07..7edd032034 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -98,13 +98,13 @@ void taosArrayRemove(SArray* pArray, size_t index); * @param pDst * @param pSrc */ -void taosArrayCopy(SArray* pDst, SArray* pSrc); +void taosArrayCopy(SArray* pDst, const SArray* pSrc); /** * clone a new array * @param pSrc */ -SArray* taosArrayClone(SArray* pSrc); +SArray* taosArrayClone(const SArray* pSrc); /** * destroy array list diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 51684e767c..3b12864ec8 100755 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -143,7 +143,7 @@ void taosArrayRemove(SArray* pArray, size_t index) { pArray->size -= 1; } -void taosArrayCopy(SArray* pDst, SArray* pSrc) { +void taosArrayCopy(SArray* pDst, const SArray* pSrc) { assert(pSrc != NULL && pDst != NULL); if (pDst->capacity < pSrc->size) { @@ -162,7 +162,7 @@ void taosArrayCopy(SArray* pDst, SArray* pSrc) { pDst->size = pSrc->size; } -SArray* taosArrayClone(SArray* pSrc) { +SArray* taosArrayClone(const SArray* pSrc) { assert(pSrc != NULL); if (pSrc->size == 0) { // empty array list diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 201214ded4..d8f534e3b7 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -97,7 +97,7 @@ static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t c if (qHasMoreResultsToRetrieve(pQInfo)) { pRet->qhandle = pQInfo; code = TSDB_CODE_ACTION_NEED_REPROCESSED; - } else { + } else { // no further execution invoked, release the ref to vnode qDestroyQueryInfo(pQInfo); vnodeRelease(pVnode); -- GitLab