From 4246517c9f98788643828af5ee123e14fc982c68 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 7 Dec 2019 15:24:52 +0800 Subject: [PATCH] [tbase-1282] --- src/client/inc/tscUtil.h | 5 +- src/client/src/tscJoinProcess.c | 123 +++++++++++----- src/client/src/tscSQLParser.c | 243 ++++++++++++++++++++------------ src/client/src/tscServer.c | 8 +- src/client/src/tscSql.c | 77 ++++++---- src/client/src/tscUtil.c | 3 +- 6 files changed, 303 insertions(+), 156 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 473fdbb942..9ea6ba7c3f 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -52,7 +52,6 @@ typedef struct SParsedDataColInfo { typedef struct SJoinSubquerySupporter { SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj - bool hasMore; // has data from vnode to fetch int32_t subqueryIndex; // index of sub query int64_t interval; // interval time SLimitVal limit; // limit info @@ -166,7 +165,6 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); -void tscTagCondSetQueryCondType(STagCond* pCond, int16_t type); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SSqlCmd* pCmd); @@ -222,6 +220,9 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void* param, void** taos); void sortRemoveDuplicates(STableDataBlocks* dataBuf); + +void tscPrintSelectClause(SSqlCmd* pCmd); + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index b470d84440..3126c3a867 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -164,8 +164,6 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS } pSupporter->pObj = pSql; - pSupporter->hasMore = true; - pSupporter->pState = pState; pSupporter->subqueryIndex = index; @@ -226,12 +224,6 @@ bool needSecondaryQuery(SSqlObj* pSql) { * launch secondary stage query to fetch the result that contains timestamp in set */ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { - // TODO not launch secondary stage query - // if (!needSecondaryQuery(pSql)) { - // return; - // } - - // sub query may not be necessary int32_t numOfSub = 0; SJoinSubquerySupporter* pSupporter = NULL; @@ -286,7 +278,6 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { pNew->cmd.type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; pNew->cmd.nAggTimeInterval = pSupporter->interval; - pNew->cmd.limit = pSupporter->limit; pNew->cmd.groupbyExpr = pSupporter->groupbyExpr; tscColumnBaseInfoCopy(&pNew->cmd.colList, &pSupporter->colList, 0); @@ -305,7 +296,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { tscFieldInfoCalOffset(&pNew->cmd); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); - + + /* + * When handling the projection query, the offset value will be modified for table-table join, which is changed + * during the timestamp intersection. + */ + pSupporter->limit = pSql->cmd.limit; + pNew->cmd.limit = pSupporter->limit; + // fetch the join tag column if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0); @@ -314,10 +312,12 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd.tagCond, pMeterMetaInfo->pMeterMeta->uid); pExpr->param[0].i64Key = tagColIndex; pExpr->numOfParams = 1; - - addRequiredTagColumn(&pNew->cmd, tagColIndex, 0); } +#ifdef _DEBUG_VIEW + tscPrintSelectClause(&pNew->cmd); +#endif + tscProcessSql(pNew); } @@ -471,9 +471,31 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { pSupporter->pState->code = numOfRows; tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); } - + + if (tscProjectionQueryOnMetric(&pSql->cmd) && numOfRows == 0) { + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + assert(pSql->cmd.numOfTables == 1); + + // for projection query, need to try next vnode if current vnode is exhausted + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + + pSupporter->pState->numOfCompleted = 0; + pSupporter->pState->numOfTotal = 1; + + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { - tscTrace("%p secondary retrieve completed, global code:%d", tres, pParentSql->res.code); + assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal); + + tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal, + pParentSql->res.code); + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { pParentSql->res.code = abs(pSupporter->pState->code); freeSubqueryObj(pParentSql); @@ -490,11 +512,17 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { assert(pSql->numOfSubs >= 1); for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param; - SSqlRes* pRes = &pSql->pSubs[i]->res; - if (pRes->row >= pRes->numOfRows && pSupporter->hasMore) { - numOfFetch++; + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); + + if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { + if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + numOfFetch++; + } + } else { + if (pRes->row >= pRes->numOfRows) { + numOfFetch++; + } } } @@ -515,8 +543,13 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { // wait for all subqueries completed pSupporter->pState->numOfTotal = numOfFetch; - if (pRes1->row >= pRes1->numOfRows && pSupporter->hasMore) { - tscTrace("%p subquery:%p retrieve data from vnode, index:%d", pSql, pSql1, pSupporter->subqueryIndex); + + assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1); + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0); + if (pRes1->row >= pRes1->numOfRows) { + tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, + pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex); tscResetForNextRetrieve(pRes1); @@ -541,7 +574,11 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; tscTrace("%p all subquery response, retrieve data", pSql); - + + if (pRes->pColumnIndex != NULL) { + return; // the column transfer support struct has been built + } + pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols); for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { @@ -631,20 +668,34 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { tscSetupOutputColumnIndex(pParentSql); - if (pParentSql->fp == NULL) { - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); - - tsem_post(&pParentSql->rspSem); - } else { - // set the command flag must be after the semaphore been correctly set. - // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - // if (pPObj->res.code == TSDB_CODE_SUCCESS) { - // (*pPObj->fp)(pPObj->param, pPObj, 0); - // } else { - // tscQueueAsyncRes(pPObj); - // } - assert(0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + + /** + * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of data instead of returning to its invoker + */ + if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd)) { + assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes); + pSupporter->pState->numOfCompleted = 0; // reset the record value + + pSql->fp = joinRetrieveCallback; // continue retrieve data + pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); + } else { // first retrieve from vnode during the secondary stage sub-query + if (pParentSql->fp == NULL) { + tsem_wait(&pParentSql->emptyRspSem); + tsem_wait(&pParentSql->emptyRspSem); + + tsem_post(&pParentSql->rspSem); + } else { + // set the command flag must be after the semaphore been correctly set. + // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; + // if (pPObj->res.code == TSDB_CODE_SUCCESS) { + // (*pPObj->fp)(pPObj->param, pPObj, 0); + // } else { + // tscQueueAsyncRes(pPObj); + // } + assert(0); + } } } } @@ -1440,7 +1491,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { assert(pDestBuf->fileSize == oldSize + size); - tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, vnode:%d, autoDelete:%d", pDestBuf, pDestBuf->path, + tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); return 0; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index d0aa290d31..128cc7f3a1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1020,7 +1020,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } setColumnOffsetValueInResultset(pCmd); - updateTagColumnIndex(pCmd, 0); + + for(int32_t i = 0; i < pCmd->numOfTables; ++i) { + updateTagColumnIndex(pCmd, i); + } break; } @@ -1796,12 +1799,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, tSQLExprItem* pItem) { } if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - SColumnIndex index1 = {0, TSDB_TBNAME_COLUMN_INDEX}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN}; strcpy(colSchema.name, TSQL_TBNAME_L); pCmd->type = TSDB_QUERY_TYPE_STABLE_QUERY; - tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index1, &colSchema, true); + tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true); } else { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex); SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -2739,15 +2741,20 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) { void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); - // update tags column index for group by tags - for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) { - int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx; - - for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { - int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j]; - if (tagColIndex == index) { - pCmd->groupbyExpr.columnInfo[i].colIdx = j; - break; + /* + * update tags column index for group by tags + * group by columns belong to this table + */ + if (pCmd->groupbyExpr.numOfGroupCols > 0 && pCmd->groupbyExpr.tableIndex == tableIndex) { + for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) { + int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx; + + for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { + int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j]; + if (tagColIndex == index) { + pCmd->groupbyExpr.columnInfo[i].colIdx = j; + break; + } } } } @@ -2755,9 +2762,15 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) { // update tags column index for expression for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); + if (!TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { // not tags, continue continue; } + + // not belongs to this table + if (pExpr->uid != pMeterMetaInfo->pMeterMeta->uid) { + continue; + } for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { if (pExpr->colInfo.colIdx == pMeterMetaInfo->tagColumnIndex[j]) { @@ -2766,6 +2779,32 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) { } } } + + // update join condition tag column index + SJoinInfo* pJoinInfo = &pCmd->tagCond.joinInfo; + if (!pJoinInfo->hasJoin) { // not join query + return; + } + + assert(pJoinInfo->left.uid != pJoinInfo->right.uid); + + // the join condition expression node belongs to this table(super table) + if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->left.uid) { + for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) { + if (pJoinInfo->left.tagCol == pMeterMetaInfo->tagColumnIndex[i]) { + pJoinInfo->left.tagCol = i; + } + } + } + + if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->right.uid) { + for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) { + if (pJoinInfo->right.tagCol == pMeterMetaInfo->tagColumnIndex[i]) { + pJoinInfo->right.tagCol = i; + } + } + } + } int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList) { @@ -2987,8 +3026,6 @@ typedef struct SCondExpr { static int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision); -static int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr); - static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) { if (pExpr->nSQLOptr == TK_ID) { // column name strncpy(*str, pExpr->colInfo.z, pExpr->colInfo.n); @@ -4018,129 +4055,128 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) { } } -int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) { - SSqlCmd* pCmd = &pSql->cmd; - - if (pExpr == NULL) { - return TSDB_CODE_SUCCESS; - } - - pCmd->stime = 0; - pCmd->etime = INT64_MAX; - - int32_t ret = TSDB_CODE_SUCCESS; - - const char* msg1 = "invalid expression"; - SCondExpr condExpr = {0}; - - if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) { - return invalidSqlErrMsg(pCmd, msg1); - } - - ret = doParseWhereClause(pSql, pExpr, &condExpr); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } +static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SCondExpr* pCondExpr) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); if (QUERY_IS_JOIN_QUERY(pCmd->type) && UTIL_METER_IS_METRIC(pMeterMetaInfo)) { SColumnIndex index = {0}; - - getColumnIndexByNameEx(&condExpr.pJoinExpr->pLeft->colInfo, pCmd, &index); + + getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pLeft->colInfo, pCmd, &index); pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex); - + int32_t columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns; addRequiredTagColumn(pCmd, columnInfo, index.tableIndex); - - getColumnIndexByNameEx(&condExpr.pJoinExpr->pRight->colInfo, pCmd, &index); + + getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pRight->colInfo, pCmd, &index); pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex); - + columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns; addRequiredTagColumn(pCmd, columnInfo, index.tableIndex); } +} - cleanQueryExpr(&condExpr); +static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SCondExpr* pCondExpr, tSQLExpr** pExpr) { + int32_t ret = TSDB_CODE_SUCCESS; + + if (pCondExpr->pTagCond != NULL) { + for (int32_t i = 0; i < pCmd->numOfTables; ++i) { + tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i); + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i); + + char c[TSDB_MAX_TAGS_LEN] = {0}; + char* str = c; + + if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) { + return ret; + } + + tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c); + + doCompactQueryExpr(pExpr); + tSQLExprDestroy(p1); + } + + pCondExpr->pTagCond = NULL; + } + return ret; } - -int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr) { +int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) { + if (pExpr == NULL) { + return TSDB_CODE_SUCCESS; + } + const char* msg = "invalid filter expression"; - - int32_t type = 0; + const char* msg1 = "invalid expression"; + + int32_t ret = TSDB_CODE_SUCCESS; + SSqlCmd* pCmd = &pSql->cmd; + pCmd->stime = 0; + pCmd->etime = INT64_MAX; - /* - * tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space - */ + //tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space SStringBuilder sb = {0}; + SCondExpr condExpr = {0}; - int32_t ret = TSDB_CODE_SUCCESS; - if ((ret = getQueryCondExpr(pCmd, pExpr, condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) { - return ret; + if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) { + return invalidSqlErrMsg(pCmd, msg1); } + int32_t type = 0; + if ((ret = getQueryCondExpr(pCmd, pExpr, &condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) { + return ret; + } + doCompactQueryExpr(pExpr); - + // after expression compact, the expression tree is only include tag query condition - condExpr->pTagCond = (*pExpr); - + condExpr.pTagCond = (*pExpr); + // 1. check if it is a join query - if ((ret = validateJoinExpr(pCmd, condExpr)) != TSDB_CODE_SUCCESS) { + if ((ret = validateJoinExpr(pCmd, &condExpr)) != TSDB_CODE_SUCCESS) { return ret; } - + // 2. get the query time range - if ((ret = getTimeRangeFromExpr(pCmd, condExpr->pTimewindow)) != TSDB_CODE_SUCCESS) { + if ((ret = getTimeRangeFromExpr(pCmd, condExpr.pTimewindow)) != TSDB_CODE_SUCCESS) { return ret; } - + // 3. get the tag query condition - if (condExpr->pTagCond != NULL) { - for (int32_t i = 0; i < pCmd->numOfTables; ++i) { - tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i); - - char c[TSDB_MAX_TAGS_LEN] = {0}; - char* str = c; - if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) { - return ret; - } - - tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c); - - doCompactQueryExpr(pExpr); - tSQLExprDestroy(p1); - } - - condExpr->pTagCond = NULL; + if ((ret = getTagQueryCondExpr(pCmd, &condExpr, pExpr)) != TSDB_CODE_SUCCESS) { + return ret; } - + // 4. get the table name query condition - if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, &sb)) != TSDB_CODE_SUCCESS) { + if ((ret = getTablenameCond(pCmd, condExpr.pTableCond, &sb)) != TSDB_CODE_SUCCESS) { return ret; } - + // 5. other column query condition - if ((ret = getColumnQueryCondInfo(pCmd, condExpr->pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) { + if ((ret = getColumnQueryCondInfo(pCmd, condExpr.pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) { return ret; } - + // 6. join condition - if ((ret = getJoinCondInfo(pSql, condExpr->pJoinExpr)) != TSDB_CODE_SUCCESS) { + if ((ret = getJoinCondInfo(pSql, condExpr.pJoinExpr)) != TSDB_CODE_SUCCESS) { return ret; } - + // 7. query condition for table name - pCmd->tagCond.relType = (condExpr->relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR; + pCmd->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR; - ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, &sb); + ret = setTableCondForMetricQuery(pSql, condExpr.pTableCond, condExpr.tableCondIndex, &sb); taosStringBuilderDestroy(&sb); if (!validateFilterExpr(pCmd)) { return invalidSqlErrMsg(pCmd, msg); } - + + doAddJoinTagsColumnsIntoTagList(pCmd, &condExpr); + + cleanQueryExpr(&condExpr); return ret; } @@ -5684,3 +5720,30 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg *pCreate) { return TSDB_CODE_SUCCESS; } + +// for debug purpose +void tscPrintSelectClause(SSqlCmd* pCmd) { + if (pCmd == NULL || pCmd->exprsInfo.numOfExprs == 0) { + return; + } + + char* str = calloc(1, 10240); + int32_t offset = 0; + + offset += sprintf(str, "%d [", pCmd->exprsInfo.numOfExprs); + for(int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); + + int32_t size = sprintf(str + offset, "%s(%d)", aAggs[pExpr->functionId].aName, pExpr->colInfo.colId); + offset += size; + + if (i < pCmd->exprsInfo.numOfExprs - 1) { + str[offset++] = ','; + } + } + + str[offset] = ']'; + printf("%s\n", str); + + free(str); +} diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 81ef5d13e6..1805eac38d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -695,8 +695,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pExpr->param->i64Key = tagColIndex; pExpr->numOfParams = 1; - addRequiredTagColumn(pCmd, tagColIndex, 0); - // add the filter tag column for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) { SColumnBase *pColBase = &pSupporter->colList.pColList[i]; @@ -708,7 +706,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu } else { pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; } - + +#ifdef _DEBUG_VIEW + tscPrintSelectClause(&pNew->cmd); +#endif + return tscProcessSql(pNew); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index fe097b15d9..f3d3407582 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -458,14 +458,48 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { while (1) { bool hasData = true; + if (tscProjectionQueryOnMetric(pCmd)) { + bool allSubqueryExhausted = true; + + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); + if (pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + allSubqueryExhausted = false; + break; + } + } + + hasData = !allSubqueryExhausted; + } else { //otherwise, in case inner join, if any subquery exhausted, query completed. + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlRes *pRes1 = &pSql->pSubs[i]->res; + if (pRes1->numOfRows == 0) { + hasData = false; + break; + } + } + } + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlRes *pRes1 = &pSql->pSubs[i]->res; - - // in case inner join, if any subquery exhausted, query completed - if (pRes1->numOfRows == 0) { - hasData = false; - break; + SMeterMetaInfo* pMeterMeta = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); + + if (tscProjectionQueryOnMetric(pCmd)) { + //For multi-vnode projection query, the results may locate in following vnode, so we needs to go on + if (pMeterMeta->vnodeIndex < pMeterMeta->pMetricMeta->numOfVnodes) { + break; + } + } else { //otherwise, in case inner join, if any subquery exhausted, query completed. + if (pRes1->numOfRows == 0) { + hasData = false; + break; + } } +// if (pRes1->numOfRows == 0 && !tscProjectionQueryOnMetric(pCmd) || +// (pMeterMeta->vnodeIndex >= pMeterMeta->pMetricMeta->numOfVnodes && )) { +// hasData = false; +// break; +// } } if (!hasData) { // free all sub sqlobj @@ -487,34 +521,26 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { } if (pRes->tsrow == NULL) { - pRes->tsrow = malloc(sizeof(void *) * pCmd->exprsInfo.numOfExprs); + pRes->tsrow = malloc(POINTER_BYTES * pCmd->exprsInfo.numOfExprs); } bool success = false; - if (pSql->numOfSubs >= 2) { - // do merge result + if (pSql->numOfSubs >= 2) { // do merge result SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes2 = &pSql->pSubs[1]->res; - while (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) { + if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) { doSetResultRowData(pSql->pSubs[0]); doSetResultRowData(pSql->pSubs[1]); - - TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; - TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; - - if (key1 == key2) { - success = true; - pRes1->row++; - pRes2->row++; - break; - } else if (key1 < key2) { - pRes1->row++; - } else if (key1 > key2) { - pRes2->row++; - } +// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; +// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; +// printf("first:%lld, second:%lld\n", key1, key2); + success = true; + pRes1->row++; + pRes2->row++; } - } else { + + } else { // only one subquery SSqlRes *pRes1 = &pSql->pSubs[0]->res; doSetResultRowData(pSql->pSubs[0]); @@ -553,9 +579,12 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); + if (pRes->code == TSDB_CODE_SUCCESS) { + tscTrace("%p data from all subqueries have been retrieved to client", pSql); return tscJoinResultsetFromBuf(pSql); } else { + tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code); return NULL; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 5ca55a486f..4521bcb156 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1538,7 +1538,7 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta* pMeterMetaInfo->numOfTags = numOfTags; if (tags != NULL) { - memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(int16_t) * numOfTags); + memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(pMeterMetaInfo->tagColumnIndex[0]) * numOfTags); } pCmd->numOfTables += 1; @@ -1673,6 +1673,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void char key[TSDB_MAX_TAGS_LEN + 1] = {0}; tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); + printf("-----%s\n", key); char* name = pMeterMetaInfo->name; SMeterMetaInfo* pFinalInfo = NULL; -- GitLab