diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6e8559538d93f3356ca8d0fff9a936680c0fc564..57cf821eb1e1d640d8593ce37501859b4e93c86b 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -200,8 +200,9 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQuer STableMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index); void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache); -STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, SArray* vgroupList, - int16_t numOfTags, int16_t* tags); +STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, + SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags); + STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); void tscFreeSubqueryInfo(SSqlCmd* pCmd); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0d684b72dce860433cbf0bc9ae74a1be120f76af..bcd6f54799db4c778fe571ddfe30fabb232a5eef 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -40,6 +40,8 @@ extern "C" { // forward declaration struct SSqlInfo; +typedef SCMSTableVgroupRspMsg SVgroupsInfo; + typedef struct SSqlGroupbyExpr { int16_t tableIndex; int16_t numOfGroupCols; @@ -70,14 +72,12 @@ typedef struct STableMeta { typedef struct STableMetaInfo { STableMeta * pTableMeta; // table meta, cached in client side and acquried by name -// SSuperTableMeta *pMetricMeta; // metricmeta - SArray* vgroupIdList; - + SVgroupsInfo* vgroupList; /* * 1. keep the vnode index during the multi-vnode super table projection query * 2. keep the vnode index for multi-vnode insertion */ - int32_t vnodeIndex; + int32_t dnodeIndex; char name[TSDB_TABLE_ID_LEN]; // (super) table name int16_t numOfTags; // total required tags in query, including groupby tags int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection @@ -210,7 +210,6 @@ typedef struct STableDataBlocks { } STableDataBlocks; typedef struct SDataBlockList { - int32_t idx; uint32_t nSize; uint32_t nAlloc; STableDataBlocks **pData; @@ -257,7 +256,6 @@ typedef struct { union { bool existsCheck; // check if the table exists or not - bool inStream; // denote if current sql is executed in stream or not bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta int8_t dataSourceType; // load data from file or not }; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 25f58568773513a1b6841f95373d8bca8eec8f0c..610027151f73044ba9c30eafe5a44aaa316d3823 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -443,12 +443,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vnodeIndex >= 0 && pSql->param != NULL); + assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->dnodeIndex >= 0 && pSql->param != NULL); SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SSqlObj * pParObj = trs->pParentSqlObj; - assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vnodeIndex && + assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->dnodeIndex && tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0); tscTrace("%p get metricMeta during super table query successfully", pSql); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index caa14d082bc144be7614fa3290908ca3108fa58f..ab5c3f9f729d36dd4dcf102ace01565bd96426d9 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) { pCmd->batchSize = 0; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - pTableMetaInfo->vnodeIndex = 0; + pTableMetaInfo->dnodeIndex = 0; return TSDB_CODE_SUCCESS; } @@ -438,7 +438,7 @@ static int insertStmtExecute(STscStmt* stmt) { } // set the next sent data vnode index in data block arraylist - pTableMetaInfo->vnodeIndex = 1; + pTableMetaInfo->dnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 5657b6b47d54608bd2e4f9deb8b5871de8bc32e7..c541a35d120d12b17fb0ad1003ddfd9140118d0f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2470,7 +2470,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* const char* msg8 = "not allowed column type for group by"; const char* msg9 = "tags not allowed for table query"; - // todo : handle two meter situation + // todo : handle two tables situation STableMetaInfo* pTableMetaInfo = NULL; if (pList == NULL) { @@ -2493,7 +2493,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* SSQLToken token = {pVar->nLen, pVar->nType, pVar->pz}; SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&token, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } @@ -2523,13 +2522,13 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* return invalidSqlErrMsg(pQueryInfo->msg, msg9); } - int32_t relIndex = index.columnIndex; - if (index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) { - relIndex -= tscGetNumOfColumns(pTableMeta); - } +// int32_t relIndex = index.columnIndex; +// if (index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) { +// relIndex -= tscGetNumOfColumns(pTableMeta); +// } pQueryInfo->groupbyExpr.columnInfo[i] = - (SColIndex){.colIndex = relIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId}; // relIndex; + (SColIndex){.colIndex = index.columnIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId}; // relIndex; addRequiredTagColumn(pQueryInfo, pQueryInfo->groupbyExpr.columnInfo[i].colIndex, index.tableIndex); } else { // check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by @@ -5095,9 +5094,8 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { bytes = TSDB_TABLE_NAME_LEN; name = TSQL_TBNAME_L; } else { - colIndex = (TSDB_COL_IS_TAG(pColIndex->flag)) ? tscGetNumOfColumns(pTableMetaInfo->pTableMeta) + pColIndex->colIndex - : pColIndex->colIndex; - +// colIndex = (TSDB_COL_IS_TAG(pColIndex->flag)) ? tscGetNumOfColumns(pTableMetaInfo->pTableMeta) + pColIndex->colIndex +// : pColIndex->colIndex; type = pSchema[colIndex].type; bytes = pSchema[colIndex].bytes; name = pSchema[colIndex].name; @@ -5108,11 +5106,14 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_TAG, &index, type, bytes, bytes); - + + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); + strncpy(pExpr->aliasName, name, TSDB_COL_NAME_LEN); + pExpr->colInfo.flag = TSDB_COL_TAG; // NOTE: tag column does not add to source column list - SColumnList ids = {0}; + SColumnList ids = getColumnList(1, 0, pColIndex->colIndex); insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs-1, &ids, bytes, type, name, pExpr); } else { // if this query is "group by" normal column, interval is not allowed @@ -5693,7 +5694,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr); if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - int32_t code = tscGetSTableVgroupInfo(pSql, index); + code = tscGetSTableVgroupInfo(pSql, index); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index d1ec5088d3d3d9e04e1d2b961b8bbd5a5b60290f..65259205ab20cefbe72159d77cafeda9d2286e24 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -636,7 +636,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); - size_t numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList); + size_t numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; for (int32_t i = 0; i < numOfSubs; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ebac676b77a292affc631dd2e0214de010127c4d..d379b54439c2c195c2521d54bff31b709a142ea1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -413,7 +413,7 @@ int tscProcessSql(SSqlObj *pSql) { type = pQueryInfo->type; - // for hearbeat, numOfTables == 0; + // for heartbeat, numOfTables == 0; assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); } @@ -424,19 +424,6 @@ int tscProcessSql(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_OTHERS; return pSql->res.code; } - - // temp -// pSql->ipList = tscMgmtIpList; -// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { -// pSql->index = pTableMetaInfo->pTableMeta->index; -// } else { // it must be the parent SSqlObj for super table query -// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { -// int32_t idx = pTableMetaInfo->vnodeIndex; -// -// SVnodeSidList *pSidList = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); -// pSql->index = pSidList->index; -// } -// } } else if (pSql->cmd.command < TSDB_SQL_LOCAL) { pSql->ipList = tscMgmtIpList; } else { // local handler @@ -522,8 +509,17 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pRetrieveMsg->free = htons(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type); - STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; - pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); + // todo valid the vgroupId at the client side + if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) { + SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList; + assert(pVgroupInfo->dnodeVgroups->numOfVgroups == 1); // todo fix me + + pRetrieveMsg->header.vgId = htonl(pVgroupInfo->dnodeVgroups[0].vgId[0]); + } else { + STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; + pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); + } + pMsg += sizeof(SRetrieveTableMsg); pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen); @@ -584,7 +580,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { #if 0 SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids; int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg); @@ -655,21 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->head.vgId = htonl(pTableMeta->vgId); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query super table - if (pTableMetaInfo->vnodeIndex < 0) { - tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex); + + if (pTableMetaInfo->dnodeIndex < 0) { + tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->dnodeIndex); return -1; } - pSql->ipList.numOfIps = taosArrayGetSize(pTableMetaInfo->vgroupIdList); + pSql->ipList.numOfIps = 1; // todo fix me pSql->ipList.port = tsDnodeShellPort; pSql->ipList.inUse = 0; - - for(int32_t i = 0; i < pSql->ipList.numOfIps; ++i) { - pSql->ipList.ip[i] = *(uint32_t*) taosArrayGet(pTableMetaInfo->vgroupIdList, i); - } + + // todo extract method + STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[pTableMetaInfo->dnodeIndex]; + pSql->ipList.ip[0] = pVgroupInfo->ipAddr.ip; #if 0 - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode; numOfTables = pVnodeSidList->numOfSids; @@ -679,9 +676,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } #endif - uint32_t vnodeId = 1; - tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); - pQueryMsg->head.vgId = htonl(vnodeId); + tscTrace("%p query on super table, numOfVgroup:%d, dnodeIndex:%d", pSql, pVgroupInfo->numOfVgroups, + pTableMetaInfo->dnodeIndex); + + pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId[0]); numOfTables = 1; } @@ -859,7 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t numOfBlocks = 0; if (pQueryInfo->tsBuf != NULL) { - STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vnodeIndex); + STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->dnodeIndex); assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent // todo refactor @@ -1851,7 +1849,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { } for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId); pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip); pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId); } @@ -2116,21 +2113,30 @@ _error_clean: free(sizes); free(metricMetaList); #endif + SSqlRes* pRes = &pSql->res; - SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pSql->res.pRsp; + SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp; pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes); - SSqlObj* pparent = pSql->param; - assert(pparent != NULL); + // master sqlObj locates in param + SSqlObj* parent = pSql->param; + assert(parent != NULL); - SSqlCmd* pCmd = &pparent->cmd; + SSqlCmd* pCmd = &parent->cmd; STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - pInfo->vgroupIdList = taosArrayInit(pStableVgroup->numOfDnodes, sizeof(int32_t)); - // todo opt performance - for(int32_t i = 0; i < pStableVgroup->numOfDnodes; ++i) { - int32_t ip = htonl(pStableVgroup->dnodeIps[i]); - taosArrayPush(pInfo->vgroupIdList, &ip); + pInfo->vgroupList = malloc(pRes->rspLen); + memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen); + + for(int32_t i = 0; i < pInfo->vgroupList->numOfDnodes; ++i) { + STableDnodeVgroupInfo* pVgroups = &pInfo->vgroupList->dnodeVgroups[i]; + pVgroups->numOfVgroups = htonl(pVgroups->numOfVgroups); + pVgroups->ipAddr.ip = htonl(pVgroups->ipAddr.ip); + pVgroups->ipAddr.port = htons(pVgroups->ipAddr.port); + + for(int32_t j = 0; j < pVgroups->numOfVgroups; ++j) { + pVgroups->vgId[j] = htonl(pVgroups->vgId[j]); + } } return pSql->res.code; @@ -2492,7 +2498,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { // bool required = false; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); - if (pQueryInfo->pTableMetaInfo[0]->vgroupIdList != NULL) { + if (pQueryInfo->pTableMetaInfo[0]->vgroupList != NULL) { return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 41cfb77c5ddbe671270d3e5cb17012cada152e01..641d0b449c39835be358aa9fc02538b5b3ba2fce 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -852,10 +852,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { } } else { // if no free resource msg is sent to vnode, we free this object immediately. - bool free = tscShouldFreeAsyncSqlObj(pSql); - if (free) { - assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); - + STscObj* pTscObj = pSql->pTscObj; + + if (pTscObj->pSql != pSql) { tscFreeSqlObj(pSql); tscTrace("%p sql result is freed by app", pSql); } else { diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index f586db3d08d5decfa3458b789d6dd4eb856e84ef..aabaa9330a09b01eb52e898c0eb2baed0eac6bbd 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -517,7 +517,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return NULL; } - pSql->cmd.inStream = 1; // 1 means sql in stream, allowed the sliding clause. pRes->code = tscToSQLCmd(pSql, &SQLInfo); SQLInfoDestroy(&SQLInfo); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index b7d01941d699067212a9037c7d9d7b4ee2ba667d..2f7f5ebb1ca1bf5f76dfa65057181298e67d26f2 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -382,7 +382,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->cmd.command = TSDB_SQL_SELECT; pQueryInfo->type = type; - tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vnodeIndex = 0; + tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->dnodeIndex = 0; } tscDoQuery(pSql); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 0589a6359bde43ac73bc62f27217bc7a63683d5c..9f975a4cbe18b4881d9d84d3439e54e6046e4171 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -341,8 +341,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { tscPrintSelectClause(pNew, 0); - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, 0, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + tscTrace("%p subquery:%p tableIndex:%d, dnodeIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, 0, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); } @@ -457,7 +457,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vnodeIndex); + tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->dnodeIndex); tsBufDestory(pBuf); } @@ -478,9 +478,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { // for projection query, need to try next vnode // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; int32_t totalVnode = 0; - if ((++pTableMetaInfo->vnodeIndex) < totalVnode) { + if ((++pTableMetaInfo->dnodeIndex) < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal); + pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotal); pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; @@ -542,7 +542,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for projection query, need to try next vnode if current vnode is exhausted -// if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { +// if ((++pTableMetaInfo->dnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { // pSupporter->pState->numOfCompleted = 0; // pSupporter->pState->numOfTotal = 1; // @@ -609,7 +609,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { // STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && +// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && // (!tscHasReachLimitation(pQueryInfo, pRes))) { // numOfFetch++; // } @@ -647,8 +647,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (pRes1->row >= pRes1->numOfRows) { - tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, - pSupporter->subqueryIndex, pTableMetaInfo->vnodeIndex); + tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, dnodeIndex:%d", pSql, pSql1, + pSupporter->subqueryIndex, pTableMetaInfo->dnodeIndex); tscResetForNextRetrieve(pRes1); pSql1->fp = joinRetrieveCallback; @@ -785,11 +785,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); /** - * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of + * if the query is a continue query (dnodeIndex > 0 for projection query) for next vnode, do the retrieval of * data instead of returning to its invoker */ - if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); + if (pTableMetaInfo->dnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { +// assert(pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value pSql->fp = joinRetrieveCallback; // continue retrieve data @@ -897,14 +897,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); tscPrintSelectClause(pNew, 0); tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); tscPrintSelectClause(pNew, 0); @@ -1005,7 +1005,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSql->numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList); + pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; assert(pSql->numOfSubs > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); @@ -1111,6 +1111,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { } static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); +static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows); static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) { // set no disk space error info @@ -1132,10 +1133,10 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES pthread_mutex_unlock(&trsupport->queryMutex); - tscRetrieveFromDnodeCallBack(trsupport, tres, trsupport->pState->code); + tscHandleSubqueryError(trsupport, tres, trsupport->pState->code); } -static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { +void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { SSqlObj *pPObj = trsupport->pParentSqlObj; int32_t subqueryIndex = trsupport->subqueryIndex; @@ -1144,9 +1145,9 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && pPObj->numOfSubs == pState->numOfTotal); - /* retrieved in subquery failed. OR query cancelled in retrieve phase. */ + // retrieved in subquery failed. OR query cancelled in retrieve phase. if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { - pState->code = -(int)pPObj->res.code; + pState->code = pPObj->res.code; /* * kill current sub-query connection, which may retrieve data from vnodes; @@ -1179,7 +1180,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry", + tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry", trsupport->pParentSqlObj, pSql); pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -1235,10 +1236,13 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SSubqueryState* pState = trsupport->pState; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; + // data in from current vnode is stored in cache and disk -// uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; -// tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, -// pSvd->vnode, numOfRowsFromSubquery, idx); + uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; + tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, + pTableMetaInfo->vgroupList->dnodeVgroups[0].ipAddr.ip, pTableMetaInfo->vgroupList->dnodeVgroups[0].vgId[0], + numOfRowsFromSubquery, idx); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); @@ -1326,7 +1330,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR pthread_mutex_lock(&trsupport->queryMutex); if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { - return tscHandleSubRetrievalError(trsupport, pSql, numOfRows); + return tscHandleSubqueryError(trsupport, pSql, numOfRows); } SSqlRes * pRes = &pSql->res; @@ -1352,7 +1356,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR return; } - #ifdef _DEBUG_VIEW printf("received data from vnode: %d rows\n", pRes->numOfRows); SSrcColumnInfo colInfo[256] = {0}; @@ -1360,6 +1363,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR tscGetSrcColumnInfo(colInfo, pQueryInfo); tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); #endif + if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, tsAvailTmpDirGB, tsMinimalTmpDirGB); @@ -1371,8 +1375,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); if (ret < 0) { // set no disk space error info, and abort retry tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); + } else if (pRes->completed) { tscAllDataRetrievedFromDnode(trsupport, pSql); + return; + } else { // continue fetch data from dnode pthread_mutex_unlock(&trsupport->queryMutex); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); @@ -1380,6 +1387,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR } else { // all data has been retrieved to client tscAllDataRetrievedFromDnode(trsupport, pSql); } + pthread_mutex_unlock(&trsupport->queryMutex); } @@ -1393,9 +1401,9 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); - // launch subquery for each vnode, so the subquery index equals to the vnodeIndex. + // launch subquery for each vnode, so the subquery index equals to the dnodeIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); - pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex; + pTableMetaInfo->dnodeIndex = trsupport->subqueryIndex; pSql->pSubs[trsupport->subqueryIndex] = pNew; } @@ -1404,37 +1412,34 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu } void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { - SRetrieveSupport *trsupport = (SRetrieveSupport *)param; + SRetrieveSupport *trsupport = (SRetrieveSupport *) param; SSqlObj* pParentSql = trsupport->pParentSqlObj; - SSqlObj* pSql = (SSqlObj *)tres; - -// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1); + SSqlObj* pSql = (SSqlObj *) tres; -// int32_t idx = pTableMetaInfo->vnodeIndex; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); - SVnodeSidList *vnodeInfo = NULL; - SVnodeDesc * pSvd = NULL; -// if (pTableMetaInfo->pMetricMeta != NULL) { -// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); -// pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; -// } + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[0]; SSubqueryState* pState = trsupport->pState; assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && pParentSql->numOfSubs == pState->numOfTotal); if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) { - // metric query is killed, Note: code must be less than 0 + + // stable query is killed, abort further retry trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { - code = -(int)(pParentSql->res.code); + code = pParentSql->res.code; } else { code = pState->code; } - tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql, - trsupport->subqueryIndex, code); + + tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql, + trsupport->subqueryIndex, tstrerror(code)); } /* @@ -1442,51 +1447,40 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack * function to abort current and remain retrieve process. * - * NOTE: threadsafe is required. + * NOTE: thread safe is required. */ if (code != TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { - tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code); + tscTrace("%p sub:%p reach the max retry times, set global code:%d", pParentSql, pSql, code); atomic_val_compare_exchange_32(&pState->code, 0, code); - } else { // does not reach the maximum retry count, go on + } else { // does not reach the maximum retry time, go on tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex); + tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", + trsupport->pParentSqlObj, pSql, pVgroupInfo->vgId[0], trsupport->subqueryIndex); - pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; + pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; } else { -// SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); -// assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL); + tscProcessSql(pNew); return; } } } - if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort - if (vnodeInfo != NULL) { - tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, - trsupport->subqueryIndex, pState->code); - } else { - tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql, - trsupport->subqueryIndex, pState->code); - } - - tscRetrieveFromDnodeCallBack(param, tres, pState->code); + if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query + tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, + pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex, pState->code); + + tscHandleSubqueryError(param, tres, pState->code); } else { // success, proceed to retrieve data from dnode - if (vnodeInfo != NULL) { - tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, - trsupport->subqueryIndex); - } else { - tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - trsupport->subqueryIndex); - } + tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, + pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7ad7e65c51b406de04df6209115ab4bba58584fc..e63d8ccf14a0befb4826c026577d96acf007801e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1178,8 +1178,9 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi pExprInfo->pExprs[index] = pExpr; pExpr->functionId = functionId; + int16_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - + // set the correct column index if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) { pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX; @@ -1190,7 +1191,6 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi // tag columns require the column index revised. if (pColIndex->columnIndex >= numOfCols) { - pColIndex->columnIndex -= numOfCols; pExpr->colInfo.flag = TSDB_COL_TAG; } else { if (pColIndex->columnIndex != TSDB_TBNAME_COLUMN_INDEX) { @@ -1916,7 +1916,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) { } STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, - SArray* vgroupList, int16_t numOfTags, int16_t* tags) { + SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags) { void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); if (pAlloc == NULL) { return NULL; @@ -1937,7 +1937,12 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST pTableMetaInfo->numOfTags = numOfTags; if (vgroupList != NULL) { - pTableMetaInfo->vgroupIdList = taosArrayClone(vgroupList); + assert(vgroupList->numOfDnodes == 1); // todo fix me + size_t size = sizeof(SVgroupsInfo) + (sizeof(STableDnodeVgroupInfo) + + vgroupList->dnodeVgroups[0].numOfVgroups * sizeof(int32_t)) * vgroupList->numOfDnodes; + + pTableMetaInfo->vgroupList = malloc(size); + memcpy(pTableMetaInfo->vgroupList, vgroupList, size); } if (tags != NULL) { @@ -1952,7 +1957,7 @@ STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) { return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, 0, NULL); } -void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { +void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { if (index < 0 || index >= pQueryInfo->numOfTables) { return; } @@ -1975,7 +1980,7 @@ void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool int32_t index = pQueryInfo->numOfTables; while (index >= 0) { - doRemoveMeterMetaInfo(pQueryInfo, --index, removeFromCache); + doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache); } tfree(pQueryInfo->pTableMetaInfo); @@ -1987,6 +1992,7 @@ void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) } taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); + tfree(pTableMetaInfo->vgroupList); // taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache); } @@ -2014,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex); + tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); free(pNew); return NULL; @@ -2058,7 +2064,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { - tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex); + tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); tscFreeSqlObj(pNew); return NULL; } @@ -2128,7 +2134,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void // pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key); // } - pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupIdList, pTableMetaInfo->numOfTags, + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags, pTableMetaInfo->tagColumnIndex); } else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object. // STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); @@ -2149,13 +2155,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscTrace( "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, + pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); tscPrintSelectClause(pNew, 0); } else { - tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vnodeIndex); + tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->dnodeIndex); } return pNew; @@ -2252,7 +2258,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; // return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && -// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1); +// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->dnodeIndex < totalVnode - 1); } void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { @@ -2271,9 +2277,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { int32_t totalVnode = 0; // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - while (++pTableMetaInfo->vnodeIndex < totalVnode) { + while (++pTableMetaInfo->dnodeIndex < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); + pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); /* * update the limit and offset value for the query on the next vnode, @@ -2292,7 +2298,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, - pTableMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + pTableMetaInfo->dnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); /* * For project query with super table join, the numOfSub is equalled to the number of all subqueries. diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c580bdecdc687957544bb5510a5fc37d4de577b9..99a1d0f0ef0ff351659deb06491b6e09c52f70fb 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -188,6 +188,11 @@ extern char *taosMsg[]; #pragma pack(push, 1) +typedef struct { + uint32_t ip; + uint16_t port; +} SIpAddr; + typedef struct { int32_t numOfVnodes; } SMsgDesc; @@ -469,7 +474,6 @@ typedef struct { int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx - uint64_t groupbyTagIds; int64_t limit; int64_t offset; uint16_t queryType; // denote another query process @@ -616,9 +620,15 @@ typedef struct SCMSTableVgroupMsg { char tableId[TSDB_TABLE_ID_LEN]; } SCMSTableVgroupMsg; +typedef struct { + SIpAddr ipAddr; + int32_t numOfVgroups; + int32_t vgId[]; +} STableDnodeVgroupInfo; + typedef struct { int32_t numOfDnodes; - uint32_t dnodeIps[]; + STableDnodeVgroupInfo dnodeVgroups[]; } SCMSTableVgroupRspMsg; typedef struct { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index e6d597f5f5d3e41cd017728c0e55dae8056d903f..9f106ed6519475e642124e98a7a7f8bc57de9943 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1103,12 +1103,26 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { if (pRsp == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; - } - - pRsp->numOfDnodes = htonl(1); - pRsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); + } + + int32_t numOfVgroups = 1; + int32_t numOfDnodes = 1; + + pRsp->numOfDnodes = htonl(numOfDnodes); + STableDnodeVgroupInfo* pVgroupInfo = pRsp->dnodeVgroups; + pVgroupInfo->ipAddr.ip = htonl(inet_addr(tsPrivateIp)); + + pVgroupInfo->ipAddr.port = htons(0); // todo fix it + pVgroupInfo->numOfVgroups = htonl(numOfVgroups); // todo fix it + int32_t* vgIdList = pVgroupInfo->vgId; + + for(int32_t i = 0; i < numOfVgroups; ++i) { + vgIdList[i] = htonl(2); // todo fix it + } + + assert(numOfDnodes == 1); // this size is valid only when numOfDnodes equals 1 + int32_t msgLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(STableDnodeVgroupInfo) + numOfVgroups * sizeof(int32_t); - int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); SRpcMsg rpcRsp = {0}; rpcRsp.handle = pMsg->thandle; rpcRsp.pCont = pRsp; @@ -1510,7 +1524,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { } else { pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].privateIp); } - pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); +// pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId); } pMeta->numOfVpeers = pVgroup->numOfVnodes;