From 6fc7c108b4b0b686820abf45ec2231c779f997a8 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 16 Apr 2020 16:24:38 +0800 Subject: [PATCH] [td-98] update vgroup info --- src/client/inc/tsclient.h | 8 ++- src/client/src/tscAsync.c | 31 ++---------- src/client/src/tscParseInsert.c | 2 +- src/client/src/tscPrepare.c | 4 +- src/client/src/tscSchemaUtil.c | 4 +- src/client/src/tscSecondaryMerge.c | 2 +- src/client/src/tscServer.c | 78 ++++++++++++++++-------------- src/client/src/tscSql.c | 2 +- src/client/src/tscSub.c | 2 +- src/client/src/tscSubquery.c | 44 ++++++++--------- src/client/src/tscUtil.c | 22 ++++----- src/inc/taosmsg.h | 2 +- src/tsdb/src/tsdbMain.c | 1 - src/util/src/ttimer.c | 2 +- tests/examples/c/demo.c | 1 + 15 files changed, 90 insertions(+), 115 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index bcd6f54799..fc88fc7cf7 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -28,12 +28,12 @@ extern "C" { #include "taos.h" #include "taosdef.h" #include "taosmsg.h" +#include "tarray.h" #include "tglobalcfg.h" #include "tlog.h" #include "trpc.h" #include "tsqlfunction.h" #include "tutil.h" -#include "tarray.h" #define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows) @@ -61,10 +61,8 @@ typedef struct STableMeta { //super table if it is created according to super table, otherwise, tableInfo is used union { struct STableMeta* pSTable; STableComInfo tableInfo; }; uint8_t tableType; - int8_t numOfVpeers; int16_t sversion; - SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; - int32_t vgId; // virtual group id, which current table belongs to + SCMVgroupInfo vgroupInfo; int32_t sid; // the index of one table in a virtual node uint64_t uid; // unique id of a table SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info @@ -77,7 +75,7 @@ typedef struct STableMetaInfo { * 1. keep the vnode index during the multi-vnode super table projection query * 2. keep the vnode index for multi-vnode insertion */ - int32_t dnodeIndex; + int32_t vgroupIndex; char name[TSDB_TABLE_ID_LEN]; // (super) table name int16_t numOfTags; // total required tags in query, including groupby tags int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 01ab5e8ff4..9d965b6cd7 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -47,7 +47,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->signature = pSql; pSql->param = param; pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_VNODES_SUPPORT; + pSql->maxRetry = TSDB_REPLICA_MAX_NUM; pSql->fp = fp; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { @@ -407,31 +407,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pSql->fp == (void *)1) { - pSql->fp = NULL; - - if (code != 0) { - pRes->code = code; - tscTrace("%p failed to renew tableMeta", pSql); -// tsem_post(&pSql->rspSem); - } else { - tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d", - pSql, pSql->cmd.command, pSql->res.code, pSql->retry); - - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - assert(pTableMetaInfo->pTableMeta == NULL); - - tscGetTableMeta(pSql, pTableMetaInfo); - code = tscSendMsgToServer(pSql); - if (code != 0) { - pRes->code = code; -// tsem_post(&pSql->rspSem); - } - } - - return; - } - if (code != TSDB_CODE_SUCCESS) { pRes->code = code; tscQueueAsyncRes(pSql); @@ -444,12 +419,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->dnodeIndex >= 0 && pSql->param != NULL); + assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL); SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SSqlObj * pParObj = trs->pParentSqlObj; - assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->dnodeIndex && + assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex && tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0); tscTrace("%p get metricMeta during super table query successfully", pSql); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index c4d3c475c4..be9a8418e9 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -699,7 +699,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); - dataBuf->vgId = pTableMeta->vgId; + dataBuf->vgId = pTableMeta->vgroupInfo.vgId; dataBuf->numOfTables = 1; /* diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index ab5c3f9f72..5c6127ae01 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) { pCmd->batchSize = 0; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - pTableMetaInfo->dnodeIndex = 0; + pTableMetaInfo->vgroupIndex = 0; return TSDB_CODE_SUCCESS; } @@ -438,7 +438,7 @@ static int insertStmtExecute(STscStmt* stmt) { } // set the next sent data vnode index in data block arraylist - pTableMetaInfo->dnodeIndex = 1; + pTableMetaInfo->vgroupIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 90e1bd4c1d..8b1ea1f328 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -168,10 +168,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; - pTableMeta->vgId = pTableMetaMsg->vgId; + pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; - pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers; - memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers); memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize); int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 65259205ab..25d3470e61 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -636,7 +636,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); - size_t numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; + size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; for (int32_t i = 0; i < numOfSubs; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 2b60355d6e..26987858bd 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -47,12 +47,12 @@ static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) { SRpcIpSet* pIpList = &pSql->ipList; - pIpList->numOfIps = pTableMeta->numOfVpeers; + pIpList->numOfIps = pTableMeta->vgroupInfo.numOfIps; pIpList->port = tsDnodeShellPort; pIpList->inUse = 0; - for(int32_t i = 0; i < pTableMeta->numOfVpeers; ++i) { - pIpList->ip[i] = pTableMeta->vpeerDesc[i].ip; + for(int32_t i = 0; i < pTableMeta->vgroupInfo.numOfIps; ++i) { + pIpList->ip[i] = pTableMeta->vgroupInfo.ipAddr[i].ip; } } @@ -511,12 +511,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // todo valid the vgroupId at the client side if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) { SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList; - assert(pVgroupInfo->dnodeVgroups->numOfVgroups == 1); // todo fix me + assert(pVgroupInfo->numOfVgroups == 1); // todo fix me - pRetrieveMsg->header.vgId = htonl(pVgroupInfo->dnodeVgroups[0].vgId[0]); + pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[0].vgId); } else { STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; - pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); + pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); } pMsg += sizeof(SRetrieveTableMsg); @@ -542,8 +542,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(SMsgDesc); SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; + int32_t vgId = pTableMeta->vgroupInfo.vgId; - pShellMsg->header.vgId = htonl(pTableMeta->vgId); + pShellMsg->header.vgId = htonl(vgId); pShellMsg->header.contLen = htonl(size); pShellMsg->length = pShellMsg->header.contLen; @@ -553,7 +554,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; tscSetDnodeIpList(pSql, pTableMeta); - tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htonl(pMsgDesc->numOfVnodes)); + tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes)); return TSDB_CODE_SUCCESS; } @@ -579,7 +580,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { #if 0 SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex); int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids; int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg); @@ -647,12 +648,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { numOfTables = 1; tscSetDnodeIpList(pSql, pTableMeta); - pQueryMsg->head.vgId = htonl(pTableMeta->vgId); + pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query super table - if (pTableMetaInfo->dnodeIndex < 0) { - tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->dnodeIndex); + if (pTableMetaInfo->vgroupIndex < 0) { + tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vgroupIndex); return -1; } @@ -661,11 +662,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->ipList.inUse = 0; // todo extract method - STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[pTableMetaInfo->dnodeIndex]; - pSql->ipList.ip[0] = pVgroupInfo->ipAddr.ip; + SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[pTableMetaInfo->vgroupIndex]; + pSql->ipList.ip[0] = pVgroupInfo->ipAddr[0].ip; #if 0 - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex); uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode; numOfTables = pVnodeSidList->numOfSids; @@ -675,10 +676,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } #endif - tscTrace("%p query on super table, numOfVgroup:%d, dnodeIndex:%d", pSql, pVgroupInfo->numOfVgroups, - pTableMetaInfo->dnodeIndex); + tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, + pTableMetaInfo->vgroupIndex); - pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId[0]); + pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); numOfTables = 1; } @@ -856,7 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t numOfBlocks = 0; if (pQueryInfo->tsBuf != NULL) { - STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->dnodeIndex); + STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex); assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent // todo refactor @@ -1827,13 +1828,15 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->sid = htonl(pMetaMsg->sid); pMetaMsg->sversion = htons(pMetaMsg->sversion); - pMetaMsg->vgId = htonl(pMetaMsg->vgId); + + pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); + pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); - if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) { - tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid); + if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) { + tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid); return TSDB_CODE_INVALID_VALUE; } @@ -1847,9 +1850,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_INVALID_VALUE; } - for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip); - pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId); + for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) { + pMetaMsg->vgroup.ipAddr[i].ip = htonl(pMetaMsg->vgroup.ipAddr[i].ip); + pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port); + + assert(pMetaMsg->vgroup.ipAddr[i].ip != 0); } SSchema* pSchema = pMetaMsg->schema; @@ -1858,6 +1863,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { for (int i = 0; i < numOfTotalCols; ++i) { pSchema->bytes = htons(pSchema->bytes); pSchema->colId = htons(pSchema->colId); + + assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR); pSchema++; } @@ -1898,9 +1905,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { * |...... 1B 1B 4B **/ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { -// uint8_t ieType; -// int32_t totalNum; -// int32_t i; #if 0 char *rsp = pSql->res.pRsp; @@ -1957,7 +1961,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { // return TSDB_CODE_OTHERS; // } // - // for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) { + // for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) { // pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode); // } // @@ -2115,7 +2119,7 @@ _error_clean: SSqlRes* pRes = &pSql->res; SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp; - pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes); + pStableVgroup->numOfVgroups = htonl(pStableVgroup->numOfVgroups); // master sqlObj locates in param SSqlObj* parent = pSql->param; @@ -2127,14 +2131,14 @@ _error_clean: pInfo->vgroupList = malloc(pRes->rspLen); memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen); - for(int32_t i = 0; i < pInfo->vgroupList->numOfDnodes; ++i) { - STableDnodeVgroupInfo* pVgroups = &pInfo->vgroupList->dnodeVgroups[i]; - pVgroups->numOfVgroups = htonl(pVgroups->numOfVgroups); - pVgroups->ipAddr.ip = htonl(pVgroups->ipAddr.ip); - pVgroups->ipAddr.port = htons(pVgroups->ipAddr.port); + for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) { + SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i]; + pVgroups->numOfIps = htonl(pVgroups->numOfIps); + pVgroups->vgId = htonl(pVgroups->vgId); - for(int32_t j = 0; j < pVgroups->numOfVgroups; ++j) { - pVgroups->vgId[j] = htonl(pVgroups->vgId[j]); + for(int32_t j = 0; j < tListLen(pVgroups->ipAddr); ++j) { + pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip); + pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port); } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index a5e18a6b8d..47860e4390 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -129,7 +129,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con pSql->pTscObj = pObj; pSql->signature = pSql; - pSql->maxRetry = TSDB_VNODES_SUPPORT; + pSql->maxRetry = TSDB_REPLICA_MAX_NUM; tsem_init(&pSql->rspSem, 0, 0); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 2f7f5ebb1c..aeeec328b4 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -382,7 +382,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->cmd.command = TSDB_SQL_SELECT; pQueryInfo->type = type; - tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->dnodeIndex = 0; + tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; } tscDoQuery(pSql); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 9f975a4cbe..0bc1becaca 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -341,8 +341,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { tscPrintSelectClause(pNew, 0); - tscTrace("%p subquery:%p tableIndex:%d, dnodeIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, 0, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, + tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); } @@ -457,7 +457,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->dnodeIndex); + tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); tsBufDestory(pBuf); } @@ -478,9 +478,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { // for projection query, need to try next vnode // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; int32_t totalVnode = 0; - if ((++pTableMetaInfo->dnodeIndex) < totalVnode) { + if ((++pTableMetaInfo->vgroupIndex) < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotal); + pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal); pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; @@ -542,7 +542,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for projection query, need to try next vnode if current vnode is exhausted -// if ((++pTableMetaInfo->dnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { +// if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { // pSupporter->pState->numOfCompleted = 0; // pSupporter->pState->numOfTotal = 1; // @@ -609,7 +609,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { // STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && +// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && // (!tscHasReachLimitation(pQueryInfo, pRes))) { // numOfFetch++; // } @@ -647,8 +647,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (pRes1->row >= pRes1->numOfRows) { - tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, dnodeIndex:%d", pSql, pSql1, - pSupporter->subqueryIndex, pTableMetaInfo->dnodeIndex); + tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql, pSql1, + pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex); tscResetForNextRetrieve(pRes1); pSql1->fp = joinRetrieveCallback; @@ -785,11 +785,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); /** - * if the query is a continue query (dnodeIndex > 0 for projection query) for next vnode, do the retrieval of + * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of * data instead of returning to its invoker */ - if (pTableMetaInfo->dnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// assert(pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); + if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { +// assert(pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value pSql->fp = joinRetrieveCallback; // continue retrieve data @@ -897,14 +897,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, + pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); tscPrintSelectClause(pNew, 0); tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, + pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); tscPrintSelectClause(pNew, 0); @@ -1005,7 +1005,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; + pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; assert(pSql->numOfSubs > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); @@ -1241,7 +1241,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // data in from current vnode is stored in cache and disk uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, - pTableMetaInfo->vgroupList->dnodeVgroups[0].ipAddr.ip, pTableMetaInfo->vgroupList->dnodeVgroups[0].vgId[0], + pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].ip, pTableMetaInfo->vgroupList->vgroups[0].vgId, numOfRowsFromSubquery, idx); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); @@ -1401,9 +1401,9 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); - // launch subquery for each vnode, so the subquery index equals to the dnodeIndex. + // launch subquery for each vnode, so the subquery index equals to the vgroupIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); - pTableMetaInfo->dnodeIndex = trsupport->subqueryIndex; + pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex; pSql->pSubs[trsupport->subqueryIndex] = pNew; } @@ -1421,7 +1421,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[0]; + SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; SSubqueryState* pState = trsupport->pState; assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && @@ -1459,7 +1459,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pVgroupInfo->vgId[0], trsupport->subqueryIndex); + trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex); pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -1475,12 +1475,12 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex, pState->code); + pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex, pState->code); tscHandleSubqueryError(param, tres, pState->code); } else { // success, proceed to retrieve data from dnode tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex); + pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e63d8ccf14..d70e2bb7f6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1937,9 +1937,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST pTableMetaInfo->numOfTags = numOfTags; if (vgroupList != NULL) { - assert(vgroupList->numOfDnodes == 1); // todo fix me - size_t size = sizeof(SVgroupsInfo) + (sizeof(STableDnodeVgroupInfo) + - vgroupList->dnodeVgroups[0].numOfVgroups * sizeof(int32_t)) * vgroupList->numOfDnodes; + assert(vgroupList->numOfVgroups == 1); // todo fix me + + size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups; pTableMetaInfo->vgroupList = malloc(size); memcpy(pTableMetaInfo->vgroupList, vgroupList, size); @@ -2020,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); + tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex); free(pNew); return NULL; @@ -2064,7 +2064,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { - tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); + tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex); tscFreeSqlObj(pNew); return NULL; } @@ -2155,13 +2155,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscTrace( "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, - pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, + pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); tscPrintSelectClause(pNew, 0); } else { - tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->dnodeIndex); + tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); } return pNew; @@ -2258,7 +2258,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; // return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && -// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->dnodeIndex < totalVnode - 1); +// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < totalVnode - 1); } void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { @@ -2277,9 +2277,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { int32_t totalVnode = 0; // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - while (++pTableMetaInfo->dnodeIndex < totalVnode) { + while (++pTableMetaInfo->vgroupIndex < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); + pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotalInCurrentClause); /* * update the limit and offset value for the query on the next vnode, @@ -2298,7 +2298,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, - pTableMetaInfo->dnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); /* * For project query with super table join, the numOfSub is equalled to the number of all subqueries. diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c24dca57c0..3d0c72e531 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -674,7 +674,7 @@ typedef struct { } SSuperTableMetaMsg; typedef struct { - SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; + SVnodeDesc vpeerDesc[TSDB_REPLICA_MAX_NUM]; int16_t index; // used locally int32_t numOfSids; int32_t pSidExtInfoList[]; // offset value of STableIdInfo diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index e5ca7fc9d3..8f99a6359a 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -732,7 +732,6 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData); -// pTable->mem->numOfPoints++; return 0; } diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index db06e1531b..04cad75655 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -562,7 +562,7 @@ void taosTmrCleanUp(void* handle) { pthread_mutex_unlock(&tmrCtrlMutex); if (numOfTmrCtrl <=0) { - pthread_cancel(athread); +// pthread_cancel(athread); for (int i = 0; i < tListLen(wheels); i++) { time_wheel_t* wheel = wheels + i; diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index d32170873a..9847300024 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -75,6 +75,7 @@ int main(int argc, char *argv[]) { doQuery(taos, "create database if not exists test"); doQuery(taos, "use test"); + doQuery(taos, "insert into tm99 values('2020-01-01 1:1:1', 99);"); // doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); -- GitLab