From c2ee6972670b2aab824582d1fea3a5f81d1817c1 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 6 May 2020 18:03:00 +0800 Subject: [PATCH] [td-198] super table interval query join without ts matched. --- src/client/inc/tscJoinProcess.h | 2 + src/client/inc/tsclient.h | 2 +- src/client/src/tscJoinProcess.c | 91 ++++++++++++++++++++++++++ src/client/src/tscParseInsert.c | 8 ++- src/client/src/tscSQLParser.c | 19 ++++-- src/client/src/tscServer.c | 49 ++++++++------ src/client/src/tscSql.c | 33 +++++++++- src/inc/tsdb.h | 2 + src/inc/tsqlfunction.h | 2 +- src/system/detail/src/vnodeQueryImpl.c | 1 - src/util/src/ttypes.c | 2 +- 11 files changed, 179 insertions(+), 32 deletions(-) diff --git a/src/client/inc/tscJoinProcess.h b/src/client/inc/tscJoinProcess.h index 4e97dfad4b..afb63744aa 100644 --- a/src/client/inc/tscJoinProcess.h +++ b/src/client/inc/tscJoinProcess.h @@ -27,6 +27,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql); void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num); void tscSetupOutputColumnIndex(SSqlObj* pSql); + +int32_t tscLaunchSecondPhaseDirectly(SSqlObj* pSql, SSubqueryState* pState); int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f931bd4729..42878cf520 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -196,7 +196,7 @@ typedef struct SDataBlockList { typedef struct SQueryInfo { int16_t command; // the command may be different for each subclause, so keep it seperately. - uint16_t type; // query/insert/import type + uint32_t type; // query/insert/import type char slidingTimeUnit; int64_t etime, stime; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 23afe29248..2a6fc56ba5 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -219,6 +219,97 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { return false; } +int32_t tscLaunchSecondPhaseDirectly(SSqlObj* pSql, SSubqueryState* pState) { + /* + * If the columns are not involved in the final select clause, the secondary query will not be launched + * for the subquery. + */ + pSql->res.qhandle = 0x1; + pSql->res.numOfRows = 0; + + tscTrace("%p start to launch secondary subqueries", pSql); + bool success = true; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + + SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); + assert(pSupporter != NULL); + + SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL); + if (pNew == NULL) { + tscDestroyJoinSupporter(pSupporter); + success = false; + break; + } + + pSql->pSubs[i] = pNew; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + pQueryInfo->tsBuf = NULL; // transfer the ownership of timestamp comp-z data to the new created object + + // set the second stage sub query for join process + pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; + + /* + * if the first column of the secondary query is not ts function, add this function. + * Because this column is required to filter with timestamp after intersecting. + */ + assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); + /* + * if the first column of the secondary query is not ts function, add this function. + * Because this column is required to filter with timestamp after intersecting. + */ + if (tscSqlExprGet(pQueryInfo, 0)->functionId != TSDB_FUNC_TS) { + tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); + } + + tscFieldInfoCalOffset(pQueryInfo); + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + + /* + * When handling the projection query, the offset value will be modified for table-table join, which is changed + * during the timestamp intersection. + */ + // fetch the join tag column + if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); + assert(pQueryInfo->tagCond.joinInfo.hasJoin); + + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid); + pExpr->param[0].i64Key = tagColIndex; + pExpr->numOfParams = 1; + } + + tscPrintSelectClause(pNew, 0); + + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pQueryInfo->type, + pQueryInfo->exprsInfo.numOfExprs, pQueryInfo->colList.numOfCols, + pQueryInfo->fieldsInfo.numOfOutputCols, pQueryInfo->pMeterInfo[0]->name); + } + + //prepare the subqueries object failed, abort + if (!success) { + pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, + pSql->numOfSubs, pSql->res.code); + freeSubqueryObj(pSql); + + return pSql->res.code; + } + + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + tscProcessSql(pSub); + } + + return TSDB_CODE_SUCCESS; +} + /* * launch secondary stage query to fetch the result that contains timestamp in set */ diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 4b6cdde34f..cb2ef50fb4 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -949,7 +949,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { } else { *sqlstr = sql; } - + + if (*sqlstr == NULL) { + code = TSDB_CODE_INVALID_SQL; + } + return code; } @@ -1290,7 +1294,7 @@ int tsParseInsertSql(SSqlObj *pSql) { SQueryInfo *pQueryInfo = NULL; tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); - uint16_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT; + uint32_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT; TSDB_QUERY_SET_TYPE(pQueryInfo->type, type); sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 45a0bbee0f..6baae638c7 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1135,6 +1135,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel const char* msg5 = "invalid function name"; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + + if (isSTable) { + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_QUERY); + } for (int32_t i = 0; i < pSelection->nExpr; ++i) { int32_t outputIndex = pQueryInfo->exprsInfo.numOfExprs; @@ -3653,7 +3657,9 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { } if (!pCondExpr->tsJoin) { - return invalidSqlErrMsg(pQueryInfo->msg, msg2); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY); + } else { + TSDB_QUERY_UNSET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY); } return TSDB_CODE_SUCCESS; @@ -5625,9 +5631,14 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { pQuerySql->pSortOrder == NULL); return doLocalQueryProcess(pQueryInfo, pQuerySql); } - - if (pQuerySql->from->nExpr > TSDB_MAX_JOIN_TABLE_NUM) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7); + + if (pQuerySql->from->nExpr > 1) { + if (pQuerySql->from->nExpr > 2) { // not support more than 2 tables join query + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7); + } + + // set the timestamp not matched join query + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY); } pQueryInfo->command = TSDB_SQL_SELECT; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ed937bb68a..8bff57c59b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -635,7 +635,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pSql->pSubs[pSql->numOfSubs++] = pNew; assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); - + if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -749,7 +749,7 @@ int tscProcessSql(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SMeterMetaInfo *pMeterMetaInfo = NULL; - int16_t type = 0; + uint32_t type = 0; if (pQueryInfo != NULL) { pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); @@ -794,26 +794,37 @@ int tscProcessSql(SSqlObj *pSql) { if (QUERY_IS_JOIN_QUERY(type)) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - pState->numOfTotal = pQueryInfo->numOfTables; - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); - - if (pSupporter == NULL) { // failed to create support struct, abort current query - tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); - pState->numOfCompleted = pQueryInfo->numOfTables - i - 1; - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - - return pSql->res.code; + if ((pQueryInfo->type & TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY) != 0) { + pSql->numOfSubs = pQueryInfo->numOfTables; + if (pSql->pSubs == NULL) { + pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); + if (pSql->pSubs == NULL) { + return TSDB_CODE_CLI_OUT_OF_MEMORY; + } } - - int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); - if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query - tscDestroyJoinSupporter(pSupporter); - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - - break; + + tscLaunchSecondPhaseDirectly(pSql, pState); + } else { + for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); + + if (pSupporter == NULL) { // failed to create support struct, abort current query + tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); + pState->numOfCompleted = pQueryInfo->numOfTables - i - 1; + pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; + + return pSql->res.code; + } + + int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); + if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query + tscDestroyJoinSupporter(pSupporter); + pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; + + break; + } } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index a5990abf32..1421fe43ea 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -551,9 +551,36 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { } if (numOfTableHasRes >= 2) { // do merge result - success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) && - (doSetResultRowData(pSql->pSubs[1], false) != NULL); - // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); + bool s1 = doSetResultRowData(pSql->pSubs[0], false); + bool s2 = doSetResultRowData(pSql->pSubs[1], false); + + success = s1 && s2; + if (success) { + TSKEY key1 = *(TSKEY*) pSql->pSubs[0]->res.tsrow[0]; + TSKEY key2 = *(TSKEY*) pSql->pSubs[1]->res.tsrow[0]; + + while(1) { + if (key1 < key2) { + s1 = doSetResultRowData(pSql->pSubs[0], false); + if (!s1) { // retrieve next block + break; + } + } else if (key1 > key2) { + s2 = doSetResultRowData(pSql->pSubs[1], false); + if (!s2) { + break; + } + } else { + break; + } + + key1 = *(TSKEY *)pSql->pSubs[0]->res.tsrow[0]; + key2 = *(TSKEY *)pSql->pSubs[1]->res.tsrow[0]; + } + + success = s1 && s2; +// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); + } } else { // only one subquery SSqlObj *pSub = pSql->pSubs[0]; if (pSub == NULL) { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 9d6c2ef16b..e783368ec4 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -230,10 +230,12 @@ extern "C" { #define TSDB_QUERY_TYPE_INSERT 0x100U // insert type #define TSDB_QUERY_TYPE_IMPORT 0x200U // import data +#define TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY 0x400u // join query without ts match #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) #define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE) +#define TSDB_QUERY_UNSET_TYPE(x, _type) ((x) &= ~(_type)) #define TSQL_SO_ASC 1 #define TSQL_SO_DESC 0 diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index b2e53a931b..e5a2734725 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -115,7 +115,7 @@ enum { }; #define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0) -#define QUERY_IS_JOIN_QUERY(type) (((type)&TSDB_QUERY_TYPE_JOIN_QUERY) != 0) +#define QUERY_IS_JOIN_QUERY(type) (((type)&(TSDB_QUERY_TYPE_JOIN_QUERY|TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY)) != 0) #define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) #define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0) diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 334e9af008..8931e4d660 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2665,7 +2665,6 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * int32_t j = 0; TSKEY lastKey = -1; int32_t lastIndex = -1; - //bool firstAccessedPoint = true; for (j = 0; j < (*forwardStep); ++j) { int32_t offset = GET_COL_DATA_POS(pQuery, j, step); diff --git a/src/util/src/ttypes.c b/src/util/src/ttypes.c index 78f8595e77..754793180f 100644 --- a/src/util/src/ttypes.c +++ b/src/util/src/ttypes.c @@ -217,7 +217,7 @@ int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc) { case TSDB_DATA_TYPE_NCHAR: return strncasecmp(pSrc->pz,pDst->pz,pSrc->nLen); default: - return 1; + return 0; } } -- GitLab