diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index bc01de110345e4c90cf5c15d3d7f6b010cb7308d..3406dcd8589e149f2a62040d1271fbfd40caaac7 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -23,7 +23,7 @@ extern "C" { #include "tscUtil.h" #include "tsclient.h" -void tscFetchDatablockFromSubquery(SSqlObj* pSql); +void tscFetchDatablockForSubquery(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 39299d3308df2d2c80c238c06ebd096eb73d2f0a..0d57dd18c35ed79bb3fe86b38cd270d81a6fbd41 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -228,6 +228,7 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscFreeVgroupTableInfo(SArray* pVgroupTables); SArray* tscVgroupTableInfoClone(SArray* pVgroupTables); void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index); +void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d21179df3d78934ae40c9acb3c3af65c77ea60d3..bcc9f8354de32c935c7255a0da4bee6b4b2b4dbb 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -339,9 +339,9 @@ typedef struct STscObj { } STscObj; typedef struct SSubqueryState { - int32_t numOfRemain; // the number of remain unfinished subquery - int32_t numOfSub; // the number of total sub-queries - uint64_t numOfRetrievedRows; // total number of points in this query + int32_t numOfRemain; // the number of remain unfinished subquery + int32_t numOfSub; // the number of total sub-queries + uint64_t numOfRetrievedRows; // total number of points in this query } SSubqueryState; typedef struct SSqlObj { @@ -515,7 +515,6 @@ extern SRpcCorEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); -int32_t tscCompareTidTags(const void* p1, const void* p2); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); #ifdef __cplusplus diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index a85a0ea570f6a3bb7f57c916ff43ac290254fed4..e70cd11fbe03b498d2caad93b086cfc33adf5669 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo } if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { - tscFetchDatablockFromSubquery(pSql); + tscFetchDatablockForSubquery(pSql); } else { tscProcessSql(pSql); } @@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { // handle the sub queries of join query if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { - tscFetchDatablockFromSubquery(pSql); + tscFetchDatablockForSubquery(pSql); } else if (pRes->completed) { if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) { if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 215b38394b0744eb036016df48bc55e6b6c245ed..4a9ee92d836c7a49e5625b714039f7492472e275 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1632,7 +1632,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t } static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, - tSQLExprItem* item, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { + const char* name, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { const char* msg1 = "not support column types"; int16_t type = 0; @@ -1640,7 +1640,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS int32_t functionID = cvtFunc.execFuncId; if (functionID == TSDB_FUNC_SPREAD) { - int32_t t1 = pSchema[pColIndex->columnIndex].type; + int32_t t1 = pSchema->type; if (t1 == TSDB_DATA_TYPE_BINARY || t1 == TSDB_DATA_TYPE_NCHAR || t1 == TSDB_DATA_TYPE_BOOL) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return -1; @@ -1649,17 +1649,12 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS bytes = tDataTypeDesc[type].nSize; } } else { - type = pSchema[pColIndex->columnIndex].type; - bytes = pSchema[pColIndex->columnIndex].bytes; + type = pSchema->type; + bytes = pSchema->bytes; } SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false); - if (item->aliasName != NULL) { - tstrncpy(pExpr->aliasName, item->aliasName, tListLen(pExpr->aliasName)); - } else { - int32_t len = MIN(tListLen(pExpr->aliasName), item->pNode->token.n + 1); - tstrncpy(pExpr->aliasName, item->pNode->token.z, len); - } + tstrncpy(pExpr->aliasName, name, tListLen(pExpr->aliasName)); if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) { pExpr->colInfo.flag |= TSDB_COL_NULL; @@ -1687,6 +1682,18 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS return TSDB_CODE_SUCCESS; } +void setResultColName(char* name, tSQLExprItem* pItem, int32_t functionId, SStrToken* pToken) { + if (pItem->aliasName != NULL) { + tstrncpy(name, pItem->aliasName, TSDB_COL_NAME_LEN); + } else { + char uname[TSDB_COL_NAME_LEN] = {0}; + int32_t len = MIN(pToken->n + 1, TSDB_COL_NAME_LEN); + tstrncpy(uname, pToken->z, len); + + snprintf(name, TSDB_COL_NAME_LEN - 1, "%s(%s)", aAggs[functionId].aName, uname); + } +} + int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult) { STableMetaInfo* pTableMetaInfo = NULL; int32_t optr = pItem->pNode->nSQLOptr; @@ -1954,9 +1961,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); + char name[TSDB_COL_NAME_LEN] = {0}; for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) { index.columnIndex = j; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex++, &index, finalResult) != 0) { + SStrToken t = {.z = pSchema[j].name, .n = strnlen(pSchema[j].name, TSDB_COL_NAME_LEN)}; + setResultColName(name, pItem, cvtFunc.originFuncId, &t); + + if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } } @@ -1967,14 +1978,18 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); // functions can not be applied to tags if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex + i, &index, finalResult) != 0) { + char name[TSDB_COL_NAME_LEN] = {0}; + + SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); + setResultColName(name, pItem, cvtFunc.originFuncId, &pParamElem->pNode->colInfo); + + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex + i, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -2011,7 +2026,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { SColumnIndex index = {.tableIndex = j, .columnIndex = i}; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex, &index, finalResult) != 0) { + + char name[TSDB_COL_NAME_LEN] = {0}; + SStrToken t = {.z = pSchema->name, .n = strnlen(pSchema->name, TSDB_COL_NAME_LEN)}; + setResultColName(name, pItem, cvtFunc.originFuncId, &t); + + if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[index.columnIndex], cvtFunc, name, colIndex, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -3458,7 +3478,7 @@ static int32_t validateArithmeticSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryI } // the expression not from the same table, return error - if (uidLeft != uidRight) { + if (uidLeft != uidRight && uidLeft != 0 && uidRight != 0) { return TSDB_CODE_TSC_INVALID_SQL; } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 779cee2163ccddca9adda9278301d4d7bff9913f..7f16476d8992914bc956e7e1cb8cad5c0e279d6e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -880,8 +880,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload)); if (pQueryInfo->tsBuf != NULL) { - int32_t vnodeId = htonl(pQueryMsg->head.vgId); - int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeId, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks); + // note: here used the index instead of actual vnode id. + int32_t vnodeIndex = pTableMetaInfo->vgroupIndex; + int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 38b0388d356f31f387876cb3cac06728f029f3aa..faf016be2343f3c4d0e5dec676829ef84ad14819 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -44,6 +44,17 @@ static int32_t tsCompare(int32_t order, int64_t left, int64_t right) { } } +static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { + while (tsBufNextPos(pTSBuf)) { + STSElem el1 = tsBufGetElem(pTSBuf); + + int32_t res = tVariantCompare(el1.tag, tag1); + if (res != 0) { // it is a record with new tag + return; + } + } +} + static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -92,54 +103,48 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ int64_t numOfInput1 = 1; int64_t numOfInput2 = 1; - int32_t numOfVnodes = 0; - int32_t* idList = NULL; - tsBufGetVnodeIdList(pSupporter2->pTSBuf, &numOfVnodes, &idList); - - bool completed = false; while(1) { STSElem elem = tsBufGetElem(pSupporter1->pTSBuf); // no data in pSupporter1 anymore, jump out of loop - if (elem.vnode < 0 || completed) { + if (!tsBufIsValidElem(&elem)) { break; } - bool f = false; - for (int32_t i = 0; i < numOfVnodes; ++i) { - STSElem el = tsBufGetElemStartPos(pSupporter2->pTSBuf, idList[i], elem.tag); - if (el.vnode == idList[i]) { - f = true; - break; - } - } + // find the data in supporter2 with the same tag value + STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag); /** * there are elements in pSupporter2 with the same tag, continue */ - if (f) { + tVariant tag1 = {0}; + tVariantAssign(&tag1, elem.tag); + + if (tsBufIsValidElem(&e2)) { while (1) { STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); + // data with current are exhausted + if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) { + break; + } + + if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag + skipRemainValue(pSupporter1->pTSBuf, &tag1); + break; + } + /* * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the - * final results which is acquired after the secondry merge of in the client. + * final results which is acquired after the secondary merge of in the client. */ int32_t re = tsCompare(order, elem1.ts, elem2.ts); if (re < 0) { - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - completed = true; - break; - } - + tsBufNextPos(pSupporter1->pTSBuf); numOfInput1++; } else if (re > 0) { - if (!tsBufNextPos(pSupporter2->pTSBuf)) { - completed = true; - break; - } - + tsBufNextPos(pSupporter2->pTSBuf); numOfInput2++; } else { if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { @@ -154,43 +159,18 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); } else { - pLimit->offset -= 1; - } - - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - completed = true; - break; + pLimit->offset -= 1;//offset apply to projection? } + tsBufNextPos(pSupporter1->pTSBuf); numOfInput1++; - if (!tsBufNextPos(pSupporter2->pTSBuf)) { - completed = true; - break; - } - + tsBufNextPos(pSupporter2->pTSBuf); numOfInput2++; } } } else { // no data in pSupporter2, ignore current data in pSupporter2 - tVariant tag = {0}; - tVariantAssign(&tag, elem.tag); - - // ignore all records with the same tag - while (tsBufNextPos(pSupporter1->pTSBuf)) { - STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf); - int32_t res = tVariantCompare(el1.tag, &tag); - - // it is a record with new tag - if (res != 0) { - break; - } - } - - STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf); - if (el1.vnode < 0) { // no data exists, abort - break; - } + skipRemainValue(pSupporter1->pTSBuf, &tag1); } } @@ -299,6 +279,68 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) { return false; } +static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { + int32_t num = 0; + int32_t* list = NULL; + tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list); + + // The virtual node, of which all tables are disqualified after the timestamp intersection, + // is removed to avoid next stage query. + // TODO: If tables from some vnodes are not qualified for next stage query, discard them. + for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) { + SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k); + + bool found = false; + for (int32_t f = 0; f < num; ++f) { + if (p->vgInfo.vgId == list[f]) { + found = true; + break; + } + } + + if (!found) { + tscRemoveVgroupTableGroup(pVgroupTables, k); + } else { + k++; + } + } + + assert(taosArrayGetSize(pVgroupTables) > 0); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY); + + taosTFree(list); +} + +static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { + int32_t num = 0; + int32_t* list = NULL; + tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list); + + int32_t numOfGroups = taosArrayGetSize(pVgroupTables); + + SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo)); + + SVgroupTableInfo info; + for (int32_t i = 0; i < num; ++i) { + int32_t vnodeId = list[i]; + + for (int32_t j = 0; j < numOfGroups; ++j) { + SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j); + if (p1->vgInfo.vgId == vnodeId) { + tscVgroupTableCopy(&info, p1); + break; + } + } + + taosArrayPush(pNew, &info); + } + + taosTFree(list); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY); + + return pNew; +} + /* * launch secondary stage query to fetch the result that contains timestamp in set */ @@ -373,12 +415,11 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pQueryInfo->fieldsInfo = pSupporter->fieldsInfo; pQueryInfo->groupbyExpr = pSupporter->groupInfo; - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); + assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); - tscFieldInfoUpdateOffset(pNewQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables; pSupporter->exprList = NULL; @@ -392,7 +433,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { * during the timestamp intersection. */ pSupporter->limit = pQueryInfo->limit; - pNewQueryInfo->limit = pSupporter->limit; + pQueryInfo->limit = pSupporter->limit; SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0); @@ -407,7 +448,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL); tscPrintSelectClause(pNew, 0); - tscFieldInfoUpdateOffset(pNewQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); pExpr = tscSqlExprGet(pQueryInfo, 0); } @@ -422,39 +463,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pExpr->numOfParams = 1; } - int32_t num = 0; - int32_t *list = NULL; - tsBufGetVnodeIdList(pNewQueryInfo->tsBuf, &num, &list); - - if (pTableMetaInfo->pVgroupTables != NULL) { - for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) { - SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k); - - bool found = false; - for(int32_t f = 0; f < num; ++f) { - if (p->vgInfo.vgId == list[f]) { - found = true; - break; - } - } - - if (!found) { - tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k); - } else { - k++; - } + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + assert(pTableMetaInfo->pVgroupTables != NULL); + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables); + tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); + pTableMetaInfo->pVgroupTables = p; + } else { + filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables); } - - assert(taosArrayGetSize(pTableMetaInfo->pVgroupTables) > 0); - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY); } - taosTFree(list); - - size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", - pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList), - numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); + pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList), + numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); } //prepare the subqueries object failed, abort @@ -1034,6 +1057,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } + assert(pState->numOfRemain > 0); if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub); return; @@ -1047,7 +1071,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } // update the records for each subquery in parent sql object. - bool isSTableSub = tscIsTwoStageSTableQuery(pQueryInfo, 0); + bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0); for (int32_t i = 0; i < pState->numOfSub; ++i) { if (pParentSql->pSubs[i] == NULL) { tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i); @@ -1061,7 +1085,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pRes1->numOfRows, pRes1->numOfTotal); assert(pRes1->row < pRes1->numOfRows); } else { - if (!isSTableSub) { + if (!stableQuery) { pRes1->numOfClauseTotal += pRes1->numOfRows; } @@ -1074,7 +1098,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR tscBuildResFromSubqueries(pParentSql); } -void tscFetchDatablockFromSubquery(SSqlObj* pSql) { +void tscFetchDatablockForSubquery(SSqlObj* pSql) { assert(pSql->subState.numOfSub >= 1); int32_t numOfFetch = 0; @@ -1136,11 +1160,22 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { if (numOfFetch <= 0) { bool tryNextVnode = false; - SSqlObj* pp = pSql->pSubs[0]; - SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0); + bool orderedPrjQuery = false; + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0); + orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0); + if (orderedPrjQuery) { + break; + } + } // get the number of subquery that need to retrieve the next vnode. - if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) { + if (orderedPrjQuery) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { @@ -1244,7 +1279,6 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex); // the column transfer support struct has been built if (pRes->pColumnIndex != NULL) { @@ -1340,21 +1374,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { return; } - // wait for the other subqueries response from vnode - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { - return; + + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + // In case of consequence query from other vnode, do not wait for other query response here. + if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + return; + } } tscSetupOutputColumnIndex(pParentSql); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); /** * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of * data instead of returning to its invoker */ if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value - pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; tscProcessSql(pSql); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index dca23e0dcb5926b250b124aa0f4099056fe3ff4a..effb2db6ac4dcf28e52abb3ff4ac0c2059749770 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1696,6 +1696,17 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { taosArrayRemove(pVgroupTable, index); } +void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) { + memset(info, 0, sizeof(SVgroupTableInfo)); + + info->vgInfo = pInfo->vgInfo; + for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { + info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn); + } + + info->itemList = taosArrayClone(pInfo->itemList); +} + SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) { if (pVgroupTables == NULL) { return NULL; @@ -1707,14 +1718,8 @@ SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) { SVgroupTableInfo info; for (size_t i = 0; i < num; i++) { SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); - memset(&info, 0, sizeof(SVgroupTableInfo)); - - info.vgInfo = pInfo->vgInfo; - for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { - info.vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn); - } + tscVgroupTableCopy(&info, pInfo); - info.itemList = taosArrayClone(pInfo->itemList); taosArrayPush(pa, &info); }