diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6a5c2d0099f9b2d3e8640511460933bc925351ad..786133a8f35ff582d2add90523d5760a20b6398c 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -268,7 +268,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); -int tscSetMgmtIpListFromCfg(const char *first, const char *second); +int tscSetMgmtEpSetFromCfg(const char *first, const char *second); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 1b30c4ffcaeaf04bb1eaff8b0d74895f65e91647..ef8184990df0d4cb23b928ba1acb4bf9f44e23e5 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -306,7 +306,7 @@ typedef struct SSqlObj { char * sqlstr; char retry; char maxRetry; - SRpcIpSet ipList; + SRpcEpSet epSet; char listed; tsem_t rspSem; SSqlCmd cmd; @@ -350,7 +350,7 @@ void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet); +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); int tscProcessSql(SSqlObj *pSql); int tscRenewTableMeta(SSqlObj *pSql, char *tableId); @@ -456,7 +456,7 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SRpcIpSet tscMgmtIpSet; +extern SRpcEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f9b12f3f6d27b53cbbe832739cd5c2dd0326b8e4..b97e48644991686c2f2aa6400c2e2b5898702878 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2355,9 +2355,9 @@ bool validateIpAddress(const char* ip, size_t size) { strncpy(tmp, ip, size); - in_addr_t ipAddr = inet_addr(tmp); + in_addr_t epAddr = inet_addr(tmp); - return ipAddr != INADDR_NONE; + return epAddr != INADDR_NONE; } int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 99ac2249e04868277404a962d6655e9a6d51e66f..398dea9d097da089a48a613ca3aeba194c38abb5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -29,8 +29,8 @@ #define TSC_MGMT_VNODE 999 -SRpcIpSet tscMgmtIpSet; -SRpcIpSet tscDnodeIpSet; +SRpcEpSet tscMgmtEpSet; +SRpcEpSet tscDnodeEpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -44,44 +44,44 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } -static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { - SRpcIpSet* pIpList = &pSql->ipList; - pIpList->inUse = 0; +static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { + SRpcEpSet* pEpSet = &pSql->epSet; + pEpSet->inUse = 0; if (pVgroupInfo == NULL) { - pIpList->numOfIps = 0; + pEpSet->numOfEps = 0; return; } - pIpList->numOfIps = pVgroupInfo->numOfIps; - for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { - strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); - pIpList->port[i] = pVgroupInfo->ipAddr[i].port; + pEpSet->numOfEps = pVgroupInfo->numOfEps; + for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { + strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); + pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } } -void tscPrintMgmtIp() { - if (tscMgmtIpSet.numOfIps <= 0) { - tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps); +void tscPrintMgmtEp() { + if (tscMgmtEpSet.numOfEps <= 0) { + tscError("invalid mnode EP list:%d", tscMgmtEpSet.numOfEps); } else { - for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); + for (int i = 0; i < tscMgmtEpSet.numOfEps; ++i) { + tscDebug("mnode index:%d %s:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]); } } } -void tscSetMgmtIpList(SRpcIpSet *pIpList) { - tscMgmtIpSet.numOfIps = pIpList->numOfIps; - tscMgmtIpSet.inUse = pIpList->inUse; - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscMgmtIpSet.port[i] = htons(pIpList->port[i]); +void tscSetMgmtEpSet(SRpcEpSet *pEpSet) { + tscMgmtEpSet.numOfEps = pEpSet->numOfEps; + tscMgmtEpSet.inUse = pEpSet->inUse; + for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) { + tscMgmtEpSet.port[i] = htons(pEpSet->port[i]); } } -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { - tscMgmtIpSet = *pIpSet; - tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse); - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); +void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) { + tscMgmtEpSet = *pEpSet; + tscDebug("mnode EP list is changed for ufp is called, numOfEps:%d inUse:%d", tscMgmtEpSet.numOfEps, tscMgmtEpSet.inUse); + for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) { + tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]); } } @@ -95,7 +95,7 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { UNUSED_FUNC static int32_t tscGetMgmtConnMaxRetryTimes() { int32_t factor = 2; - return tscMgmtIpSet.numOfIps * factor; + return tscMgmtEpSet.numOfEps * factor; } void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { @@ -111,9 +111,9 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (code == 0) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; - SRpcIpSet * pIpList = &pRsp->ipList; - if (pIpList->numOfIps > 0) - tscSetMgmtIpList(pIpList); + SRpcEpSet * pEpSet = &pRsp->epSet; + if (pEpSet->numOfEps > 0) + tscSetMgmtEpSet(pEpSet); pSql->pTscObj->connId = htonl(pRsp->connId); @@ -185,7 +185,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - pSql->ipList = tscMgmtIpSet; + pSql->epSet = tscMgmtEpSet; } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -203,11 +203,11 @@ int tscSendMsgToServer(SSqlObj *pSql) { // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); + rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released", pSql); @@ -237,9 +237,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { } if (pCmd->command < TSDB_SQL_MGMT) { - if (pIpSet) pSql->ipList = *pIpSet; + if (pEpSet) pSql->epSet = *pEpSet; } else { - if (pIpSet) tscMgmtIpSet = *pIpSet; + if (pEpSet) tscMgmtEpSet = *pEpSet; } if (rpcMsg->pCont == NULL) { @@ -421,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - pSql->ipList = tscMgmtIpSet; + pSql->epSet = tscMgmtEpSet; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -525,10 +525,10 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); + tscSetDnodeEpSet(pSql, &pTableMeta->vgroupInfo); - tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, - pSql->ipList.numOfIps); + tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, + pSql->epSet.numOfEps); return TSDB_CODE_SUCCESS; } @@ -568,7 +568,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char pVgroupInfo = &pTableMeta->vgroupInfo; } - tscSetDnodeIpList(pSql, pVgroupInfo); + tscSetDnodeEpSet(pSql, pVgroupInfo); if (pVgroupInfo != NULL) { pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); } @@ -580,7 +580,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char pQueryMsg->numOfTables = htonl(1); // set the number of tables pMsg += sizeof(STableIdInfo); - } else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo + } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo int32_t index = pTableMetaInfo->vgroupIndex; int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(index >= 0 && index < numOfVgroups); @@ -590,7 +590,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); + tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); @@ -1323,7 +1323,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); + tscSetDnodeEpSet(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); return TSDB_CODE_SUCCESS; } @@ -1658,8 +1658,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); - if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) { - tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid); + if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) { + tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -1673,8 +1673,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_TSC_INVALID_VALUE; } - for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) { - pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port); + for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) { + pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port); } SSchema* pSchema = pMetaMsg->schema; @@ -1850,10 +1850,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; pVgroups->vgId = htonl(pVgroups->vgId); - assert(pVgroups->numOfIps >= 1); + assert(pVgroups->numOfEps >= 1); - for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { - pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port); + for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { + pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); } pMsg += size; @@ -1946,8 +1946,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db)); - if (pConnect->ipList.numOfIps > 0) - tscSetMgmtIpList(&pConnect->ipList); + if (pConnect->epSet.numOfEps > 0) + tscSetMgmtEpSet(&pConnect->epSet); strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index d5160cd3c6d45403e41123c5946b1f53e6ca7b2e..6a14d3a65e33b4f5edf8feafcbb909723bf8bdb2 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -62,8 +62,8 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con } if (ip) { - if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL; - if (port) tscMgmtIpSet.port[0] = port; + if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; + if (port) tscMgmtEpSet.port[0] = port; } void *pDnodeConn = NULL; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index faa4a2488a36a9daa3e06dd6160d8c72b0683bca..1dbc52efb0868042b28f4e358c7af0df878e47e8 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -458,7 +458,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr break; } } - assert(info.vgInfo.numOfIps != 0); + assert(info.vgInfo.numOfEps != 0); vgTables = taosArrayInit(4, sizeof(STableIdInfo)); info.itemList = vgTables; @@ -1600,8 +1600,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // data in from current vnode is stored in cache and disk uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; - tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, - pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, + tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, + pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, numOfRowsFromSubquery, idx); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); @@ -1719,8 +1719,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(pRes->numOfRows == numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); - tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pParentSql, pSql, - pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx); + tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql, + pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx); if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, @@ -1828,8 +1828,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { return; } - tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, - pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); + tscTrace("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, + pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode tscRetrieveFromDnodeCallBack(param, pSql, 0); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 82cc8cc225399f2aa78da0989192b82acb3bf8eb..7722acf0d07bdddc994787f8343fff7309053086 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -41,7 +41,7 @@ int tscNumOfThreads; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); +void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet); void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosGetDisk(); @@ -116,8 +116,8 @@ void taos_init_imp() { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - if (tscSetMgmtIpListFromCfg(tsFirst, tsSecond) < 0) { - tscError("failed to init mnode IP list"); + if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) { + tscError("failed to init mnode EP list"); return; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 49d15f6499c945a84e230504668731743071e27b..4e6133663eb0d7d2ae7908eb2622a665e6010008 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2145,17 +2145,17 @@ char* strdup_throw(const char* str) { return p; } -int tscSetMgmtIpListFromCfg(const char *first, const char *second) { - tscMgmtIpSet.numOfIps = 0; - tscMgmtIpSet.inUse = 0; +int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { + tscMgmtEpSet.numOfEps = 0; + tscMgmtEpSet.inUse = 0; if (first && first[0] != 0) { if (strlen(first) >= TSDB_EP_LEN) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(first, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); + tscMgmtEpSet.numOfEps++; } if (second && second[0] != 0) { @@ -2163,11 +2163,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(second, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); + tscMgmtEpSet.numOfEps++; } - if ( tscMgmtIpSet.numOfIps == 0) { + if ( tscMgmtEpSet.numOfEps == 0) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } diff --git a/src/connector/grafana/tdengine/package-lock.json b/src/connector/grafana/tdengine/package-lock.json index 7c8853b99cf44ee59b4cc1da1fdd787c75533ce1..013a005d8d5bc56f49bf5c8a76e3e8a52888e973 100644 --- a/src/connector/grafana/tdengine/package-lock.json +++ b/src/connector/grafana/tdengine/package-lock.json @@ -1,6 +1,6 @@ { "name": "TDengine", - "version": "1.0.0", + "version": "2.0.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -2302,9 +2302,9 @@ } }, "lodash": { - "version": "4.17.13", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.13.tgz", - "integrity": "sha512-vm3/XWXfWtRua0FkUyEHBZy8kCPjErNBT9fJx8Zvs+U6zjqPbTUOpkaoum3O5uiA8sm+yNMHXfYkTUHFoMxFNA==" + "version": "4.17.19", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.19.tgz", + "integrity": "sha512-JNvd8XER9GQX0v2qJgsaN/mzFCNA5BRe/j8JN9d+tWyGLSodKQHKFicdwNYzWwI3wjRnaKPsGj1XkBjx/F96DQ==" }, "log-symbols": { "version": "2.2.0", diff --git a/src/connector/grafana/tdengine/package.json b/src/connector/grafana/tdengine/package.json index 678278ec4793f8f2d4e5b07c0f7a964961928862..b7e65123548fbb57bd23bbde0dc32ebc1c111686 100644 --- a/src/connector/grafana/tdengine/package.json +++ b/src/connector/grafana/tdengine/package.json @@ -38,7 +38,7 @@ "q": "^1.5.0" }, "dependencies": { - "lodash": "^4.17.13", + "lodash": "^4.17.19", "yarn": "^1.22.0" }, "homepage": "https://github.com/taosdata/TDengine/tree/develop/src/connector/grafana/tdengine" diff --git a/src/connector/grafana/tdengine/yarn.lock b/src/connector/grafana/tdengine/yarn.lock index f785e4e478590c4e4db1276d6a5127d963591561..00bc01df7af8d99380a98eb930d40a936370fd36 100644 --- a/src/connector/grafana/tdengine/yarn.lock +++ b/src/connector/grafana/tdengine/yarn.lock @@ -1888,14 +1888,10 @@ locate-path@^3.0.0: p-locate "^3.0.0" path-exists "^3.0.0" -lodash@^4.17.10, lodash@^4.17.13, lodash@^4.17.4, lodash@^4.2.0, lodash@~4.17.10, lodash@~4.17.5: - version "4.17.13" - resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.13.tgz#0bdc3a6adc873d2f4e0c4bac285df91b64fc7b93" - -lodash@^4.17.15: - version "4.17.15" - resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.15.tgz#b447f6670a0455bbfeedd11392eff330ea097548" - integrity sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A== +lodash@^4.17.10, lodash@^4.17.15, lodash@^4.17.19, lodash@^4.17.4, lodash@^4.2.0, lodash@~4.17.10, lodash@~4.17.5: + version "4.17.19" + resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.19.tgz#e48ddedbe30b3321783c5b4301fbd353bc1e4a4b" + integrity sha512-JNvd8XER9GQX0v2qJgsaN/mzFCNA5BRe/j8JN9d+tWyGLSodKQHKFicdwNYzWwI3wjRnaKPsGj1XkBjx/F96DQ== log-symbols@2.2.0: version "2.2.0" diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 51cd471a6b66526228b85be1c8dbc6f033d2515f..e0f30166c437c4d5005f2911417a28a3e3ae0be1 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -103,9 +103,6 @@ void cqClose(void *handle) { SCqContext *pContext = handle; if (handle == NULL) return; - taosTmrCleanUp(pContext->tmrCtrl); - pContext->tmrCtrl = NULL; - // stop all CQs cqStop(pContext); @@ -125,6 +122,9 @@ void cqClose(void *handle) { pthread_mutex_destroy(&pContext->mutex); + taosTmrCleanUp(pContext->tmrCtrl); + pContext->tmrCtrl = NULL; + cTrace("vgId:%d, CQ is closed", pContext->vgId); free(pContext); } diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 092c06d84bac14af43b6f35f10b15acadc5a1125..e8f4a0823f3cc9846b85804d323e73a8cfc0476e 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -35,8 +35,8 @@ void* dnodeGetVnodeTsdb(void *pVnode); void dnodeReleaseVnode(void *pVnode); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); -void dnodeGetMnodeIpSetForPeer(void *ipSet); -void dnodeGetMnodeIpSetForShell(void *ipSet); +void dnodeGetMnodeEpSetForPeer(void *epSet); +void dnodeGetMnodeEpSetForShell(void *epSet); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 7d3ef9926d76c7be6e30f0267b88aba61c8b8b7b..9050b9c582d09eb1ab608923dab17ae6b0084aa4 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -52,7 +52,7 @@ void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; -static SRpcIpSet tsDMnodeIpSet = {0}; +static SRpcEpSet tsDMnodeEpSet = {0}; static SDMMnodeInfos tsDMnodeInfos = {0}; static SDMDnodeCfg tsDnodeCfg = {0}; static taos_qset tsMgmtQset = NULL; @@ -90,21 +90,21 @@ int32_t dnodeInitMgmt() { tsRebootTime = taosGetTimestampSec(); if (!dnodeReadMnodeInfos()) { - memset(&tsDMnodeIpSet, 0, sizeof(SRpcIpSet)); + memset(&tsDMnodeEpSet, 0, sizeof(SRpcEpSet)); memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos)); - tsDMnodeIpSet.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSet.fqdn[0], &tsDMnodeIpSet.port[0]); + tsDMnodeEpSet.numOfEps = 1; + taosGetFqdnPortFromEp(tsFirst, tsDMnodeEpSet.fqdn[0], &tsDMnodeEpSet.port[0]); if (strcmp(tsSecond, tsFirst) != 0) { - tsDMnodeIpSet.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSet.fqdn[1], &tsDMnodeIpSet.port[1]); + tsDMnodeEpSet.numOfEps = 2; + taosGetFqdnPortFromEp(tsSecond, tsDMnodeEpSet.fqdn[1], &tsDMnodeEpSet.port[1]); } } else { - tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; + tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum; for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]); } } @@ -450,27 +450,27 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { return taosCfgDynamicOptions(pCfg->config); } -void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) { - dInfo("mnode IP list for is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); - for (int i = 0; i < pIpSet->numOfIps; ++i) { - pIpSet->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) +void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { + dInfo("mnode EP list for is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); + for (int i = 0; i < pEpSet->numOfEps; ++i) { + pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; + dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]) } - tsDMnodeIpSet = *pIpSet; + tsDMnodeEpSet = *pEpSet; } -void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) { - SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSet; +void dnodeGetMnodeEpSetForPeer(void *epSetRaw) { + SRpcEpSet *epSet = epSetRaw; + *epSet = tsDMnodeEpSet; - for (int i=0; inumOfIps; ++i) - ipSet->port[i] += TSDB_PORT_DNODEDNODE; + for (int i=0; inumOfEps; ++i) + epSet->port[i] += TSDB_PORT_DNODEDNODE; } -void dnodeGetMnodeIpSetForShell(void *ipSetRaw) { - SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSet; +void dnodeGetMnodeEpSetForShell(void *epSetRaw) { + SRpcEpSet *epSet = epSetRaw; + *epSet = tsDMnodeEpSet; } static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { @@ -536,10 +536,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { dInfo("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); } - tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; + tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum; for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]); } dnodeSaveMnodeInfos(); @@ -549,10 +549,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { static bool dnodeReadMnodeInfos() { char ipFile[TSDB_FILENAME_LEN*2] = {0}; - sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir); + sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir); FILE *fp = fopen(ipFile, "r"); if (!fp) { - dDebug("failed to read mnodeIpList.json, file not exist"); + dDebug("failed to read mnodeEpSet.json, file not exist"); return false; } @@ -563,40 +563,40 @@ static bool dnodeReadMnodeInfos() { if (len <= 0) { free(content); fclose(fp); - dError("failed to read mnodeIpList.json, content is null"); + dError("failed to read mnodeEpSet.json, content is null"); return false; } content[len] = 0; cJSON* root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read mnodeIpList.json, invalid json format"); + dError("failed to read mnodeEpSet.json, invalid json format"); goto PARSE_OVER; } cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); if (!inUse || inUse->type != cJSON_Number) { - dError("failed to read mnodeIpList.json, inUse not found"); + dError("failed to read mnodeEpSet.json, inUse not found"); goto PARSE_OVER; } tsDMnodeInfos.inUse = inUse->valueint; cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); if (!nodeNum || nodeNum->type != cJSON_Number) { - dError("failed to read mnodeIpList.json, nodeNum not found"); + dError("failed to read mnodeEpSet.json, nodeNum not found"); goto PARSE_OVER; } tsDMnodeInfos.nodeNum = nodeNum->valueint; cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); if (!nodeInfos || nodeInfos->type != cJSON_Array) { - dError("failed to read mnodeIpList.json, nodeInfos not found"); + dError("failed to read mnodeEpSet.json, nodeInfos not found"); goto PARSE_OVER; } int size = cJSON_GetArraySize(nodeInfos); if (size != tsDMnodeInfos.nodeNum) { - dError("failed to read mnodeIpList.json, nodeInfos size not matched"); + dError("failed to read mnodeEpSet.json, nodeInfos size not matched"); goto PARSE_OVER; } @@ -606,14 +606,14 @@ static bool dnodeReadMnodeInfos() { cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); if (!nodeId || nodeId->type != cJSON_Number) { - dError("failed to read mnodeIpList.json, nodeId not found"); + dError("failed to read mnodeEpSet.json, nodeId not found"); goto PARSE_OVER; } tsDMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint; cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - dError("failed to read mnodeIpList.json, nodeName not found"); + dError("failed to read mnodeEpSet.json, nodeName not found"); goto PARSE_OVER; } strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); @@ -621,7 +621,7 @@ static bool dnodeReadMnodeInfos() { ret = true; - dInfo("read mnode iplist successed, numOfIps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse); + dInfo("read mnode epSet successed, numOfEps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse); for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { dInfo("mnode:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); } @@ -635,7 +635,7 @@ PARSE_OVER: static void dnodeSaveMnodeInfos() { char ipFile[TSDB_FILENAME_LEN] = {0}; - sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir); + sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir); FILE *fp = fopen(ipFile, "w"); if (!fp) return; @@ -663,11 +663,11 @@ static void dnodeSaveMnodeInfos() { fclose(fp); free(content); - dInfo("save mnode iplist successed"); + dInfo("save mnode epSet successed"); } char *dnodeGetMnodeMasterEp() { - return tsDMnodeInfos.nodeInfos[tsDMnodeIpSet.inUse].nodeEp; + return tsDMnodeInfos.nodeInfos[tsDMnodeEpSet.inUse].nodeEp; } void* dnodeGetMnodeInfos() { @@ -726,9 +726,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { .msgType = TSDB_MSG_TYPE_DM_STATUS }; - SRpcIpSet ipSet; - dnodeGetMnodeIpSetForPeer(&ipSet); - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + SRpcEpSet epSet; + dnodeGetMnodeEpSetForPeer(&epSet); + dnodeSendMsgToDnode(&epSet, &rpcMsg); } static bool dnodeReadDnodeCfg() { @@ -817,20 +817,20 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { SRpcConnInfo connInfo = {0}; rpcGetConnInfo(rpcMsg->handle, &connInfo); - SRpcIpSet ipSet = {0}; + SRpcEpSet epSet = {0}; if (forShell) { - dnodeGetMnodeIpSetForShell(&ipSet); + dnodeGetMnodeEpSetForShell(&epSet); } else { - dnodeGetMnodeIpSetForPeer(&ipSet); + dnodeGetMnodeEpSetForPeer(&epSet); } - dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[rpcMsg->msgType], - taosIpStr(connInfo.clientIp), connInfo.user, ipSet.numOfIps, ipSet.inUse); + dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType], + taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse); - for (int i = 0; i < ipSet.numOfIps; ++i) { - dDebug("mnode index:%d %s:%d", i, ipSet.fqdn[i], ipSet.port[i]); - ipSet.port[i] = htons(ipSet.port[i]); + for (int i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); + epSet.port[i] = htons(epSet.port[i]); } - rpcSendRedirectRsp(rpcMsg->handle, &ipSet); + rpcSendRedirectRsp(rpcMsg->handle, &epSet); } diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 2a3436583f20699052398af729ac69ec3ccea8bb..b27f56a871bf4b0eecf841b625c6251810edbfce 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -29,11 +29,11 @@ #include "dnodeVWrite.h" #include "dnodeMPeer.h" -extern void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet); +extern void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *); +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); static void *tsDnodeServerRpc = NULL; static void *tsDnodeClientRpc = NULL; @@ -83,7 +83,7 @@ void dnodeCleanupServer() { } } -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg rspMsg = { .handle = pMsg->handle, .pCont = NULL, @@ -148,9 +148,9 @@ void dnodeCleanupClient() { } } -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { - if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) { - dnodeUpdateMnodeIpSetForPeer(pIpSet); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { + dnodeUpdateMnodeEpSetForPeer(pEpSet); } if (dnodeProcessRspMsgFp[pMsg->msgType]) { @@ -166,12 +166,12 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { dnodeProcessRspMsgFp[msgType] = fp; } -void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg); +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { + rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg); } void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { - SRpcIpSet ipSet = {0}; - dnodeGetMnodeIpSetForPeer(&ipSet); - rpcSendRecv(tsDnodeClientRpc, &ipSet, rpcMsg, rpcRsp); + SRpcEpSet epSet = {0}; + dnodeGetMnodeEpSetForPeer(&epSet); + rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index d089495079b47e7ac69c475eb5ef19d0cb0c8f11..f9d137bb99505047ed23fee5a58b57a014789e7f 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -31,7 +31,7 @@ #include "dnodeShell.h" static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *); +static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static void * tsDnodeShellRpc = NULL; static int32_t tsDnodeQueryReqNum = 0; @@ -108,7 +108,7 @@ void dnodeCleanupShell() { } } -void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg rpcMsg = { .handle = pMsg->handle, .pCont = NULL, diff --git a/src/inc/dnode.h b/src/inc/dnode.h index b561c407a3415d7db27333d96e21a72d4f159d8b..5a059c93a627b9c57085db4ad30058393f8791b0 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -39,13 +39,13 @@ SDnodeStatisInfo dnodeGetStatisInfo(); bool dnodeIsFirstDeploy(); char * dnodeGetMnodeMasterEp(); -void dnodeGetMnodeIpSetForPeer(void *ipSet); -void dnodeGetMnodeIpSetForShell(void *ipSet); +void dnodeGetMnodeEpSetForPeer(void *epSet); +void dnodeGetMnodeEpSetForShell(void *epSet); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); -void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a367d6b93bae60c41b5e4108d6d9dca43efd6d1e..96386aab65018b3adc43dedd58bcc0f190f5a358 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -176,7 +176,7 @@ extern char *taosMsg[]; typedef struct { char fqdn[TSDB_FQDN_LEN]; uint16_t port; -} SIpAddr; +} SEpAddr; typedef struct { int32_t numOfVnodes; @@ -306,7 +306,7 @@ typedef struct { int8_t reserved1; int8_t reserved2; int32_t connId; - SRpcIpSet ipList; + SRpcEpSet epSet; } SCMConnectRsp; typedef struct { @@ -648,8 +648,8 @@ typedef struct SCMSTableVgroupMsg { typedef struct { int32_t vgId; - int8_t numOfIps; - SIpAddr ipAddr[TSDB_MAX_REPLICA]; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SCMVgroupInfo; typedef struct { @@ -753,7 +753,7 @@ typedef struct { uint32_t onlineDnodes; uint32_t connId; int8_t killConnection; - SRpcIpSet ipList; + SRpcEpSet epSet; } SCMHeartBeatRsp; typedef struct { diff --git a/src/inc/trpc.h b/src/inc/trpc.h index d1adfb7494aafb89912c508c65ab9a774fae4f09..1af6a5eb0f83cfefb949a6db91194ec8b002b944 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -28,12 +28,12 @@ extern "C" { extern int tsRpcHeadSize; -typedef struct SRpcIpSet { +typedef struct SRpcEpSet { int8_t inUse; - int8_t numOfIps; + int8_t numOfEps; uint16_t port[TSDB_MAX_REPLICA]; char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; -} SRpcIpSet; +} SRpcEpSet; typedef struct SRpcConnInfo { uint32_t clientIp; @@ -67,7 +67,7 @@ typedef struct SRpcInit { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(SRpcMsg *, SRpcIpSet *); + void (*cfp)(SRpcMsg *, SRpcEpSet *); // call back to retrieve the client auth info, for server app only int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); @@ -78,11 +78,11 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg); +void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); +void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(void *pContext); diff --git a/src/kit/taosmigrate/taosmigrate.c b/src/kit/taosmigrate/taosmigrate.c index b80ca44a10c87c5c9102f6090e54ef8a32d736e5..3ef73ef169a59a8569a520f9c4def32a1de2f644 100644 --- a/src/kit/taosmigrate/taosmigrate.c +++ b/src/kit/taosmigrate/taosmigrate.c @@ -210,10 +210,10 @@ int32_t main(int32_t argc, char *argv[]) { (void)snprintf(mnodeWal, TSDB_FILENAME_LEN*2, "%s/mnode/wal/wal0", arguments.dataDir); walModWalFile(mnodeWal); - // 2. modfiy dnode config: mnodeIpList.json - char dnodeIpList[TSDB_FILENAME_LEN*2] = {0}; - (void)snprintf(dnodeIpList, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeIpList.json", arguments.dataDir); - modDnodeIpList(dnodeIpList); + // 2. modfiy dnode config: mnodeEpSet.json + char dnodeEpSet[TSDB_FILENAME_LEN*2] = {0}; + (void)snprintf(dnodeEpSet, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeEpSet.json", arguments.dataDir); + modDnodeEpSet(dnodeEpSet); // 3. modify vnode config: config.json char vnodeDir[TSDB_FILENAME_LEN*2] = {0}; diff --git a/src/kit/taosmigrate/taosmigrate.h b/src/kit/taosmigrate/taosmigrate.h index a0a02e651cac94502bd3ccecfac54fc795544cde..9fb3c92db284033cdfc2dcaa736e088a9b84f14f 100644 --- a/src/kit/taosmigrate/taosmigrate.h +++ b/src/kit/taosmigrate/taosmigrate.h @@ -71,7 +71,7 @@ int tSystemShell(const char * cmd); void taosMvFile(char* destFile, char *srcFile) ; void walModWalFile(char* walfile); SdnodeIfo* getDnodeInfo(int32_t dnodeId); -void modDnodeIpList(char* dnodeIpList); +void modDnodeEpSet(char* dnodeEpSet); void modAllVnode(char *vnodeDir); #endif diff --git a/src/kit/taosmigrate/taosmigrateDnodeCfg.c b/src/kit/taosmigrate/taosmigrateDnodeCfg.c index 263d5521e91deb0a2bb5be66e8988cd4b776b4bd..7f6fd03feab28c5e71ac6c390160bde8dabf4b67 100644 --- a/src/kit/taosmigrate/taosmigrateDnodeCfg.c +++ b/src/kit/taosmigrate/taosmigrateDnodeCfg.c @@ -23,10 +23,10 @@ static SDMMnodeInfos tsDnodeIpInfos = {0}; -static bool dnodeReadMnodeInfos(char* dnodeIpList) { - FILE *fp = fopen(dnodeIpList, "r"); +static bool dnodeReadMnodeInfos(char* dnodeEpSet) { + FILE *fp = fopen(dnodeEpSet, "r"); if (!fp) { - printf("failed to read mnodeIpList.json, file not exist\n"); + printf("failed to read mnodeEpSet.json, file not exist\n"); return false; } @@ -37,40 +37,40 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { if (len <= 0) { free(content); fclose(fp); - printf("failed to read mnodeIpList.json, content is null\n"); + printf("failed to read mnodeEpSet.json, content is null\n"); return false; } content[len] = 0; cJSON* root = cJSON_Parse(content); if (root == NULL) { - printf("failed to read mnodeIpList.json, invalid json format\n"); + printf("failed to read mnodeEpSet.json, invalid json format\n"); goto PARSE_OVER; } cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); if (!inUse || inUse->type != cJSON_Number) { - printf("failed to read mnodeIpList.json, inUse not found\n"); + printf("failed to read mnodeEpSet.json, inUse not found\n"); goto PARSE_OVER; } tsDnodeIpInfos.inUse = inUse->valueint; cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); if (!nodeNum || nodeNum->type != cJSON_Number) { - printf("failed to read mnodeIpList.json, nodeNum not found\n"); + printf("failed to read mnodeEpSet.json, nodeNum not found\n"); goto PARSE_OVER; } tsDnodeIpInfos.nodeNum = nodeNum->valueint; cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); if (!nodeInfos || nodeInfos->type != cJSON_Array) { - printf("failed to read mnodeIpList.json, nodeInfos not found\n"); + printf("failed to read mnodeEpSet.json, nodeInfos not found\n"); goto PARSE_OVER; } int size = cJSON_GetArraySize(nodeInfos); if (size != tsDnodeIpInfos.nodeNum) { - printf("failed to read mnodeIpList.json, nodeInfos size not matched\n"); + printf("failed to read mnodeEpSet.json, nodeInfos size not matched\n"); goto PARSE_OVER; } @@ -80,14 +80,14 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); if (!nodeId || nodeId->type != cJSON_Number) { - printf("failed to read mnodeIpList.json, nodeId not found\n"); + printf("failed to read mnodeEpSet.json, nodeId not found\n"); goto PARSE_OVER; } tsDnodeIpInfos.nodeInfos[i].nodeId = nodeId->valueint; cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - printf("failed to read mnodeIpList.json, nodeName not found\n"); + printf("failed to read mnodeEpSet.json, nodeName not found\n"); goto PARSE_OVER; } strncpy(tsDnodeIpInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); @@ -102,7 +102,7 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { ret = true; - //printf("read mnode iplist successed, numOfIps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse); + //printf("read mnode epSet successed, numOfEps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse); //for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) { // printf("mnode:%d, %s\n", tsDnodeIpInfos.nodeInfos[i].nodeId, tsDnodeIpInfos.nodeInfos[i].nodeEp); //} @@ -115,8 +115,8 @@ PARSE_OVER: } -static void dnodeSaveMnodeInfos(char* dnodeIpList) { - FILE *fp = fopen(dnodeIpList, "w"); +static void dnodeSaveMnodeInfos(char* dnodeEpSet) { + FILE *fp = fopen(dnodeEpSet, "w"); if (!fp) return; int32_t len = 0; @@ -143,13 +143,13 @@ static void dnodeSaveMnodeInfos(char* dnodeIpList) { fclose(fp); free(content); - printf("mod mnode iplist successed\n"); + printf("mod mnode epSet successed\n"); } -void modDnodeIpList(char* dnodeIpList) +void modDnodeEpSet(char* dnodeEpSet) { - (void)dnodeReadMnodeInfos(dnodeIpList); - dnodeSaveMnodeInfos(dnodeIpList); + (void)dnodeReadMnodeInfos(dnodeEpSet); + dnodeSaveMnodeInfos(dnodeEpSet); return; } diff --git a/src/mnode/inc/mnodeMnode.h b/src/mnode/inc/mnodeMnode.h index 1060907234ef9964a92be48b3a9e953b8362e9c8..0976ea8acd74e511a6cda374dc6ad5501b503482 100644 --- a/src/mnode/inc/mnodeMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -42,12 +42,12 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode); void mnodeDecMnodeRef(struct SMnodeObj *pMnode); char * mnodeGetMnodeRoleStr(); -void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet); -void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet); +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet); +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); char* mnodeGetMnodeMasterEp(); void mnodeGetMnodeInfos(void *mnodes); -void mnodeUpdateMnodeIpSet(); +void mnodeUpdateMnodeEpSet(); #ifdef __cplusplus } diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index 3f1da896055e2475bbbce8ece261d56dabb1dd66..9c5b201e937e15888ff7498d2d9a7df8dc24eaeb 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -44,12 +44,12 @@ int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); -void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); +void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); -SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup); -SRpcIpSet mnodeGetIpSetFromIp(char *ep); +SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); +SRpcEpSet mnodeGetEpSetFromIp(char *ep); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 9085f1a9f7ac16cc5b31b7c8b534719e07dbdb6d..7edba8662e0daab22baf444754c23df9b10444fe 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -289,14 +289,14 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { } } - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pCmCfgDnode->ep); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pCmCfgDnode->ep); if (dnodeId != 0) { SDnodeObj *pDnode = mnodeGetDnode(dnodeId); if (pDnode == NULL) { mError("failed to cfg dnode, invalid dnodeId:%d", dnodeId); return TSDB_CODE_MND_DNODE_NOT_EXIST; } - ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); + epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); mnodeDecDnodeRef(pDnode); } @@ -313,7 +313,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { }; mInfo("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user); - dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg); + dnodeSendMsgToDnode(&epSet, &rpcMdCfgDnodeMsg); return TSDB_CODE_SUCCESS; } @@ -399,9 +399,9 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId); if (pVgroup == NULL) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); mInfo("dnode:%d, vgId:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId); - mnodeSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); + mnodeSendDropVnodeMsg(pVload->vgId, &epSet, NULL); } else { mnodeUpdateVgroupStatus(pVgroup, pDnode, pVload); pAccess->vgId = htonl(pVload->vgId); diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index f74de2b325f640592c0b68f6d3f79e4559a9c441..d2f389ca0b6afde4dd9e48c05a99c266ebbbca44 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -35,8 +35,8 @@ static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; -static SRpcIpSet tsMnodeIpSetForShell; -static SRpcIpSet tsMnodeIpSetForPeer; +static SRpcEpSet tsMnodeEpSetForShell; +static SRpcEpSet tsMnodeEpSetForPeer; static SDMMnodeInfos tsMnodeInfos; static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -123,7 +123,7 @@ static int32_t mnodeMnodeActionRestored() { sdbFreeIter(pIter); } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); return TSDB_CODE_SUCCESS; } @@ -204,13 +204,13 @@ char *mnodeGetMnodeRoleStr(int32_t role) { } } -void mnodeUpdateMnodeIpSet() { - mInfo("update mnodes ipset, numOfIps:%d ", mnodeGetMnodesNum()); +void mnodeUpdateMnodeEpSet() { + mInfo("update mnodes epSet, numOfEps:%d ", mnodeGetMnodesNum()); mnodeMnodeWrLock(); - memset(&tsMnodeIpSetForShell, 0, sizeof(SRpcIpSet)); - memset(&tsMnodeIpSetForPeer, 0, sizeof(SRpcIpSet)); + memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet)); + memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet)); memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos)); int32_t index = 0; @@ -222,20 +222,20 @@ void mnodeUpdateMnodeIpSet() { SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode != NULL) { - strcpy(tsMnodeIpSetForShell.fqdn[index], pDnode->dnodeFqdn); - tsMnodeIpSetForShell.port[index] = htons(pDnode->dnodePort); - mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForShell.fqdn[index], htons(tsMnodeIpSetForShell.port[index])); + strcpy(tsMnodeEpSetForShell.fqdn[index], pDnode->dnodeFqdn); + tsMnodeEpSetForShell.port[index] = htons(pDnode->dnodePort); + mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForShell.fqdn[index], htons(tsMnodeEpSetForShell.port[index])); - strcpy(tsMnodeIpSetForPeer.fqdn[index], pDnode->dnodeFqdn); - tsMnodeIpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); - mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForPeer.fqdn[index], htons(tsMnodeIpSetForPeer.port[index])); + strcpy(tsMnodeEpSetForPeer.fqdn[index], pDnode->dnodeFqdn); + tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); + mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index])); tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId); strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp); if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { - tsMnodeIpSetForShell.inUse = index; - tsMnodeIpSetForPeer.inUse = index; + tsMnodeEpSetForShell.inUse = index; + tsMnodeEpSetForPeer.inUse = index; tsMnodeInfos.inUse = index; } @@ -248,23 +248,23 @@ void mnodeUpdateMnodeIpSet() { } tsMnodeInfos.nodeNum = index; - tsMnodeIpSetForShell.numOfIps = index; - tsMnodeIpSetForPeer.numOfIps = index; + tsMnodeEpSetForShell.numOfEps = index; + tsMnodeEpSetForPeer.numOfEps = index; sdbFreeIter(pIter); mnodeMnodeUnLock(); } -void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet) { +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *ipSet = tsMnodeIpSetForPeer; + *epSet = tsMnodeEpSetForPeer; mnodeMnodeUnLock(); } -void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet) { +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *ipSet = tsMnodeIpSetForShell; + *epSet = tsMnodeEpSetForShell; mnodeMnodeUnLock(); } @@ -295,7 +295,7 @@ int32_t mnodeAddMnode(int32_t dnodeId) { code = TSDB_CODE_MND_SDB_ERROR; } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); return code; } @@ -308,7 +308,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { mnodeDecMnodeRef(pMnode); } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); } int32_t mnodeDropMnode(int32_t dnodeId) { @@ -330,7 +330,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { sdbDecRef(tsMnodeSdb, pMnode); - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); return code; } diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 04f2889607f3c4537bffc33c4530e4f0b693774c..71b8b1ea843dc799e18d38ac5fe5b464ab1170d7 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -53,14 +53,14 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); - mnodeGetMnodeIpSetForPeer(ipSet); - rpcRsp->rsp = ipSet; - rpcRsp->len = sizeof(SRpcIpSet); + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForPeer(epSet); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); - mDebug("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); - for (int32_t i = 0; i < ipSet->numOfIps; ++i) { - mDebug("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + mDebug("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse); + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index dfecf1f69381f97128ef541f06112063bf26ab71..af2fed3408ab726b5a819f6717fab497fcba1136 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -49,14 +49,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); - mnodeGetMnodeIpSetForShell(ipSet); - rpcRsp->rsp = ipSet; - rpcRsp->len = sizeof(SRpcIpSet); + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForShell(epSet); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); - mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); - for (int32_t i = 0; i < ipSet->numOfIps; ++i) { - mDebug("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse); + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index e08f8ca0db6a3e2199e3f1389ce6c70369db8e2f..0f72dbdec4f508a90e418c80dffbd63af9f12998 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -219,7 +219,7 @@ void sdbUpdateMnodeRoles() { } } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); } static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index a56ad34a250cff8d6db53eaf9473fc496cf3212a..e3d5b41be317272501bf84607f41adb50c43fbd6 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -270,7 +270,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); - mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); + mnodeGetMnodeEpSetForShell(&pHBRsp->epSet); pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); @@ -335,7 +335,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; - mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); + mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet); connect_over: if (code != TSDB_CODE_SUCCESS) { diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 80c4fc95abd7b8e9a4314f5c032ad79da48312d3..75ed442cd44bb8882d368229e16a5b7656ec3129 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -910,9 +910,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { mInfo("app:%p:%p, stable:%s, send drop stable msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, pVgroup->vgId); - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup); SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); mnodeDecVgroupRef(pVgroup); } taosHashDestroyIter(pIter); @@ -1484,10 +1484,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; if (pDnode == NULL) break; - tstrncpy(pVgroupInfo->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); - pVgroupInfo->vgroups[vgSize].ipAddr[vn].port = htons(pDnode->dnodePort); + tstrncpy(pVgroupInfo->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); + pVgroupInfo->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); - pVgroupInfo->vgroups[vgSize].numOfIps++; + pVgroupInfo->vgroups[vgSize].numOfEps++; } vgSize++; @@ -1615,7 +1615,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { return terrno; } - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); SRpcMsg rpcMsg = { .ahandle = pMsg, .pCont = pMDCreate, @@ -1624,7 +1624,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -1788,7 +1788,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { pDrop->sid = htonl(pTable->sid); pDrop->uid = htobe64(pTable->uid); - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); mInfo("app:%p:%p, table:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid); @@ -1803,7 +1803,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { if (!needReturn) rpcMsg.ahandle = NULL; - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -1842,7 +1842,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { } } - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); SRpcMsg rpcMsg = { .ahandle = pMsg, .pCont = pMDCreate, @@ -1854,7 +1854,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { mDebug("app:%p:%p, ctable %s, send alter column msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, pMsg->pVgroup->vgId); - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -1996,9 +1996,9 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { for (int32_t i = 0; i < pMsg->pVgroup->numOfVnodes; ++i) { SDnodeObj *pDnode = mnodeGetDnode(pMsg->pVgroup->vnodeGid[i].dnodeId); if (pDnode == NULL) break; - strcpy(pMeta->vgroup.ipAddr[i].fqdn, pDnode->dnodeFqdn); - pMeta->vgroup.ipAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL); - pMeta->vgroup.numOfIps++; + strcpy(pMeta->vgroup.epAddr[i].fqdn, pDnode->dnodeFqdn); + pMeta->vgroup.epAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL); + pMeta->vgroup.numOfEps++; mnodeDecDnodeRef(pDnode); } pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 5f24981b50b31c6fed8ddefddd8e4e112e7dbedd..1de591df7cd627e1b6b33a28d5174024e1d55875 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -317,9 +317,9 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl } if (!dnodeExist) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); mError("vgId:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId); - mnodeSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); + mnodeSendDropVnodeMsg(pVload->vgId, &epSet, NULL); return; } @@ -809,29 +809,29 @@ static SMDCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { return pVnode; } -SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup) { - SRpcIpSet ipSet = { - .numOfIps = pVgroup->numOfVnodes, +SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup) { + SRpcEpSet epSet = { + .numOfEps = pVgroup->numOfVnodes, .inUse = 0, }; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - strcpy(ipSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn); - ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE; + strcpy(epSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn); + epSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE; } - return ipSet; + return epSet; } -SRpcIpSet mnodeGetIpSetFromIp(char *ep) { - SRpcIpSet ipSet; +SRpcEpSet mnodeGetEpSetFromIp(char *ep) { + SRpcEpSet epSet; - ipSet.numOfIps = 1; - ipSet.inUse = 0; - taosGetFqdnPortFromEp(ep, ipSet.fqdn[0], &ipSet.port[0]); - ipSet.port[0] += TSDB_PORT_DNODEDNODE; - return ipSet; + epSet.numOfEps = 1; + epSet.inUse = 0; + taosGetFqdnPortFromEp(ep, epSet.fqdn[0], &epSet.port[0]); + epSet.port[0] += TSDB_PORT_DNODEDNODE; + return epSet; } -static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { +static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { SMDAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .ahandle = NULL, @@ -840,21 +840,21 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_ALTER_VNODE }; - dnodeSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(epSet, &rpcMsg); } void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, pVgroup->vnodeGid[i].pDnode->dnodeEp); - mnodeSendAlterVnodeMsg(pVgroup, &ipSet); + mnodeSendAlterVnodeMsg(pVgroup, &epSet); } } -static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { +static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) { SMDCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .ahandle = ahandle, @@ -863,17 +863,17 @@ static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *aha .code = 0, .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE }; - dnodeSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(epSet, &rpcMsg); } void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { mDebug("vgId:%d, send create all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, index:%d, send create vnode msg to dnode %s, ahandle:%p", pVgroup->vgId, i, pVgroup->vnodeGid[i].pDnode->dnodeEp, ahandle); - mnodeSendCreateVnodeMsg(pVgroup, &ipSet, ahandle); + mnodeSendCreateVnodeMsg(pVgroup, &epSet, ahandle); } } @@ -926,7 +926,7 @@ static SMDDropVnodeMsg *mnodeBuildDropVnodeMsg(int32_t vgId) { return pDrop; } -void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { +void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle) { SMDDropVnodeMsg *pDrop = mnodeBuildDropVnodeMsg(vgId); SRpcMsg rpcMsg = { .ahandle = ahandle, @@ -935,16 +935,16 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE }; - dnodeSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(epSet, &rpcMsg); } static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { pVgroup->status = TAOS_VG_STATUS_DROPPING; // deleting mDebug("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, send drop vnode msg to dnode:%d, ahandle:%p", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, ahandle); - mnodeSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle); + mnodeSendDropVnodeMsg(pVgroup->vgId, &epSet, ahandle); } } @@ -998,8 +998,8 @@ static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) { } mDebug("vgId:%d, send create vnode msg to dnode %s for vnode cfg msg", pVgroup->vgId, pDnode->dnodeEp); - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); - mnodeSendCreateVnodeMsg(pVgroup, &ipSet, NULL); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); + mnodeSendCreateVnodeMsg(pVgroup, &epSet, NULL); mnodeDecDnodeRef(pDnode); mnodeDecVgroupRef(pVgroup); diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 326ed5098197a29587c2e027c3af0d3a32aeb920..ab3cfa2dad179628a9ee74d578ff0d4d06ead127 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -49,16 +49,16 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); - mnodeGetMnodeIpSetForShell(ipSet); - rpcRsp->rsp = ipSet; - rpcRsp->len = sizeof(SRpcIpSet); + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForShell(epSet); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], - ipSet->inUse); - for (int32_t i = 0; i < ipSet->numOfIps; ++i) { - mDebug("app:%p:%p, mnode index:%d ip:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, ipSet->fqdn[i], - htons(ipSet->port[i])); + epSet->inUse); + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], + htons(epSet->port[i])); } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 5d67d5e615e2e6d6c5aff6d2e4f7ca097ce99fab..2325d12d92dc4b862a1041ab8fc952a5a1ac9630 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -55,7 +55,7 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void (*cfp)(SRpcMsg *, SRpcIpSet *); + void (*cfp)(SRpcMsg *, SRpcEpSet *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t refCount; @@ -71,7 +71,7 @@ typedef struct { typedef struct { SRpcInfo *pRpc; // associated SRpcInfo - SRpcIpSet ipSet; // ip list provided by app + SRpcEpSet epSet; // ip list provided by app void *ahandle; // handle provided by app void *signature; // for validation struct SRpcConn *pConn; // pConn allocated @@ -80,12 +80,12 @@ typedef struct { int32_t contLen; // content length int32_t code; // error code int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server IP inUse passed by app + int8_t oldInUse; // server EP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API - SRpcIpSet *pSet; // for synchronous API + SRpcEpSet *pSet; // for synchronous API char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -355,7 +355,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) { +void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -364,11 +364,11 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) { pContext->ahandle = pMsg->ahandle; pContext->signature = pContext; pContext->pRpc = (SRpcInfo *)shandle; - pContext->ipSet = *pIpSet; + pContext->epSet = *pEpSet; pContext->contLen = contLen; pContext->pCont = pMsg->pCont; pContext->msgType = pMsg->msgType; - pContext->oldInUse = pIpSet->inUse; + pContext->oldInUse = pEpSet->inUse; pContext->connType = RPC_CONN_UDPC; if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; @@ -458,15 +458,15 @@ void rpcSendResponse(const SRpcMsg *pRsp) { return; } -void rpcSendRedirectRsp(void *thandle, const SRpcIpSet *pIpSet) { +void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.contLen = sizeof(SRpcIpSet); + rpcMsg.contLen = sizeof(SRpcEpSet); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); if (rpcMsg.pCont == NULL) return; - memcpy(rpcMsg.pCont, pIpSet, sizeof(SRpcIpSet)); + memcpy(rpcMsg.pCont, pEpSet, sizeof(SRpcEpSet)); rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.handle = thandle; @@ -488,7 +488,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); @@ -498,9 +498,9 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) tsem_init(&sem, 0, 0); pContext->pSem = &sem; pContext->pRsp = pRsp; - pContext->pSet = pIpSet; + pContext->pSet = pEpSet; - rpcSendRequest(shandle, pIpSet, pMsg); + rpcSendRequest(shandle, pEpSet, pMsg); tsem_wait(&sem); tsem_destroy(&sem); @@ -755,11 +755,11 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { SRpcConn *pConn; SRpcInfo *pRpc = pContext->pRpc; - SRpcIpSet *pIpSet = &pContext->ipSet; + SRpcEpSet *pEpSet = &pContext->epSet; - pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); + pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); if ( pConn == NULL || pConn->user[0] == 0) { - pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); + pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); } if (pConn) { @@ -1020,16 +1020,16 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { pContext->pConn = NULL; if (pContext->pRsp) { // for synchronous API - memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); + memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); tsem_post(pContext->pSem); } else { // for asynchronous API - SRpcIpSet *pIpSet = NULL; - if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) - pIpSet = &pContext->ipSet; + SRpcEpSet *pEpSet = NULL; + if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) + pEpSet = &pContext->epSet; - (*pRpc->cfp)(pMsg, pIpSet); + (*pRpc->cfp)(pMsg, pEpSet); } // free the request message @@ -1070,9 +1070,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pConn->pContext = NULL; pConn->pReqMsg = NULL; - // for UDP, port may be changed by server, the port in ipSet shall be used for cache + // for UDP, port may be changed by server, the port in epSet shall be used for cache if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); } else { rpcCloseConn(pConn); } @@ -1087,10 +1087,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; - memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); - tDebug("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps); - for (int i=0; iipSet.numOfIps; ++i) - pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); + memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); + tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps); + for (int i=0; iepSet.numOfEps; ++i) + pContext->epSet.port[i] = htons(pContext->epSet.port[i]); rpcSendReqToServer(pRpc, pContext); rpcFreeCont(rpcMsg.pCont); } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) { @@ -1269,7 +1269,7 @@ static void rpcProcessConnError(void *param, void *id) { tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); - if (pContext->numOfTry >= pContext->ipSet.numOfIps) { + if (pContext->numOfTry >= pContext->epSet.numOfEps) { rpcMsg.msgType = pContext->msgType+1; rpcMsg.ahandle = pContext->ahandle; rpcMsg.code = pContext->code; @@ -1279,8 +1279,8 @@ static void rpcProcessConnError(void *param, void *id) { rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP - pContext->ipSet.inUse++; - pContext->ipSet.inUse = pContext->ipSet.inUse % pContext->ipSet.numOfIps; + pContext->epSet.inUse++; + pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext); } } diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index f8dbbedb115bdceef137d17fd9aefe29b19c629e..e2d9d388f9af91f12fa0fba0e892ba256555552e 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -22,7 +22,7 @@ typedef struct { int index; - SRpcIpSet ipSet; + SRpcEpSet epSet; int num; int numOfReqs; int msgSize; @@ -32,11 +32,11 @@ typedef struct { void *pRpc; } SInfo; -static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); - if (pIpSet) pInfo->ipSet = *pIpSet; + if (pEpSet) pInfo->epSet = *pEpSet; rpcFreeCont(pMsg->pCont); sem_post(&pInfo->rspSem); @@ -57,7 +57,7 @@ static void *sendRequest(void *param) { rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg); + rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); if ( pInfo->num % 20000 == 0 ) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); sem_wait(&pInfo->rspSem); @@ -71,7 +71,7 @@ static void *sendRequest(void *param) { int main(int argc, char *argv[]) { SRpcInit rpcInit; - SRpcIpSet ipSet; + SRpcEpSet epSet; int msgSize = 128; int numOfReqs = 0; int appThreads = 1; @@ -82,12 +82,12 @@ int main(int argc, char *argv[]) { pthread_attr_t thattr; // server info - ipSet.numOfIps = 1; - ipSet.inUse = 0; - ipSet.port[0] = 7000; - ipSet.port[1] = 7000; - strcpy(ipSet.fqdn[0], serverIp); - strcpy(ipSet.fqdn[1], "192.168.0.1"); + epSet.numOfEps = 1; + epSet.inUse = 0; + epSet.port[0] = 7000; + epSet.port[1] = 7000; + strcpy(epSet.fqdn[0], serverIp); + strcpy(epSet.fqdn[1], "192.168.0.1"); // client info memset(&rpcInit, 0, sizeof(rpcInit)); @@ -105,9 +105,9 @@ int main(int argc, char *argv[]) { for (int i=1; iindex = i; - pInfo->ipSet = ipSet; + pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; sem_init(&pInfo->rspSem, 0, 0); diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c index 266697d55569b44b784208bfabee88fe168b5b1a..6e6961784b1266eb8886bfeb2c831b69024ba71c 100644 --- a/src/rpc/test/rsclient.c +++ b/src/rpc/test/rsclient.c @@ -23,7 +23,7 @@ typedef struct { int index; - SRpcIpSet ipSet; + SRpcEpSet epSet; int num; int numOfReqs; int msgSize; @@ -51,7 +51,7 @@ static void *sendRequest(void *param) { rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg); + rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg); // handle response if (rspMsg.code != 0) terror++; @@ -72,7 +72,7 @@ static void *sendRequest(void *param) { int main(int argc, char *argv[]) { SRpcInit rpcInit; - SRpcIpSet ipSet; + SRpcEpSet epSet; int msgSize = 128; int numOfReqs = 0; int appThreads = 1; @@ -83,12 +83,12 @@ int main(int argc, char *argv[]) { pthread_attr_t thattr; // server info - ipSet.numOfIps = 1; - ipSet.inUse = 0; - ipSet.port[0] = 7000; - ipSet.port[1] = 7000; - strcpy(ipSet.fqdn[0], serverIp); - strcpy(ipSet.fqdn[1], "192.168.0.1"); + epSet.numOfEps = 1; + epSet.inUse = 0; + epSet.port[0] = 7000; + epSet.port[1] = 7000; + strcpy(epSet.fqdn[0], serverIp); + strcpy(epSet.fqdn[1], "192.168.0.1"); // client info memset(&rpcInit, 0, sizeof(rpcInit)); @@ -106,9 +106,9 @@ int main(int argc, char *argv[]) { for (int i=1; iindex = i; - pInfo->ipSet = ipSet; + pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; sem_init(&pInfo->rspSem, 0, 0); diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index d06e9df64b431a24b06d6f29335bfe6cbf3bffea..44c5cd6ab48810496c1c141ba8563e1dead1de15 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -103,7 +103,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char return ret; } -void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg *pTemp; pTemp = taosAllocateQitem(sizeof(SRpcMsg)); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index ffaab375a3bfc305cbedecae6df8881e28772aa2..e30164592dee23c1baf86c11f44daba2f3e4ab85 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -148,7 +148,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { STsdbRepo *pRepo = (STsdbRepo *)repo; int vgId = REPO_ID(pRepo); - tsdbStopStream(repo); + tsdbStopStream(pRepo); if (toCommit) { tsdbAsyncCommit(pRepo); @@ -1126,6 +1126,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) { } } + static void tsdbStopStream(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 98882e4c3ccb4c1d964f41137b438f74c193853f..249fb428e7781c2c88551116602bb80e584cbc95 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -340,6 +340,13 @@ void vnodeRelease(void *pVnodeRaw) { tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; + // stop continuous query + if (pVnode->cq) { + void *cq = pVnode->cq; + pVnode->cq = NULL; + cqClose(cq); + } + if (pVnode->wal) walClose(pVnode->wal); pVnode->wal = NULL; @@ -511,13 +518,6 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { syncStop(sync); } - // stop continuous query - if (pVnode->cq) { - void *cq = pVnode->cq; - pVnode->cq = NULL; - cqClose(cq); - } - vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); // release local resources only after cutting off outside connections diff --git a/tests/comparisonTest/tdengine/q5.txt b/tests/comparisonTest/tdengine/q5.txt new file mode 100644 index 0000000000000000000000000000000000000000..5f36955dd6a426cd3fae0a15d264facf4633502f --- /dev/null +++ b/tests/comparisonTest/tdengine/q5.txt @@ -0,0 +1 @@ +select * from db.devices; diff --git a/tests/comparisonTest/tdengine/tdengineTest.c b/tests/comparisonTest/tdengine/tdengineTest.c index de925263376933f55b5fc67d25d7a253da65d2b6..3d78a3d0a54ac571aceb13c1bb9a6f133589bd86 100644 --- a/tests/comparisonTest/tdengine/tdengineTest.c +++ b/tests/comparisonTest/tdengine/tdengineTest.c @@ -13,8 +13,9 @@ typedef struct { char sql[256]; char dataDir[256]; int filesNum; - int writeClients; + int clients; int rowsPerRequest; + int write; } ProArgs; typedef struct { @@ -41,7 +42,7 @@ int main(int argc, char *argv[]) { statis.totalRows = 0; parseArg(argc, argv); - if (arguments.writeClients > 0) { + if (arguments.write) { writeData(); } else { readData(); @@ -52,7 +53,7 @@ void parseArg(int argc, char *argv[]) { strcpy(arguments.sql, "./sqlCmd.txt"); strcpy(arguments.dataDir, "./testdata"); arguments.filesNum = 2; - arguments.writeClients = 0; + arguments.clients = 1; arguments.rowsPerRequest = 100; for (int i = 1; i < argc; ++i) { @@ -83,12 +84,12 @@ void parseArg(int argc, char *argv[]) { exit(EXIT_FAILURE); } } - else if (strcmp(argv[i], "-writeClients") == 0) { + else if (strcmp(argv[i], "-clients") == 0) { if (i < argc - 1) { - arguments.writeClients = atoi(argv[++i]); + arguments.clients = atoi(argv[++i]); } else { - fprintf(stderr, "'-writeClients' requires a parameter, default:%d\n", arguments.writeClients); + fprintf(stderr, "'-clients' requires a parameter, default:%d\n", arguments.clients); exit(EXIT_FAILURE); } } @@ -101,6 +102,9 @@ void parseArg(int argc, char *argv[]) { exit(EXIT_FAILURE); } } + else if (strcmp(argv[i], "-w") == 0) { + arguments.write = 1; + } } } @@ -215,7 +219,7 @@ void writeDataImp(void *param) { void writeData() { printf("write data\n"); - printf("---- writeClients: %d\n", arguments.writeClients); + printf("---- clients: %d\n", arguments.clients); printf("---- dataDir: %s\n", arguments.dataDir); printf("---- numOfFiles: %d\n", arguments.filesNum); printf("---- rowsPerRequest: %d\n", arguments.rowsPerRequest); @@ -243,12 +247,12 @@ void writeData() { int64_t st = getTimeStampMs(); - int a = arguments.filesNum / arguments.writeClients; - int b = arguments.filesNum % arguments.writeClients; + int a = arguments.filesNum / arguments.clients; + int b = arguments.filesNum % arguments.clients; int last = 0; - ThreadObj *threads = calloc((size_t)arguments.writeClients, sizeof(ThreadObj)); - for (int i = 0; i < arguments.writeClients; ++i) { + ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj)); + for (int i = 0; i < arguments.clients; ++i) { ThreadObj *pthread = threads + i; pthread_attr_t thattr; pthread->threadId = i + 1; @@ -264,7 +268,7 @@ void writeData() { pthread_create(&pthread->pid, &thattr, (void *(*)(void *))writeDataImp, pthread); } - for (int i = 0; i < arguments.writeClients; i++) { + for (int i = 0; i < arguments.clients; i++) { pthread_join(threads[i].pid, NULL); } @@ -272,17 +276,15 @@ void writeData() { float seconds = (float)elapsed / 1000; float rs = (float)statis.totalRows / seconds; + free(threads); + printf("---- Spent %f seconds to insert %ld records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs); } -void readData() { - printf("read data\n"); - printf("---- sql: %s\n", arguments.sql); - - void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); - if (taos == NULL) - taos_error(taos); - +void readDataImp(void *param) +{ + ThreadObj *pThread = (ThreadObj *)param; + printf("Thread %d\n", pThread->threadId); FILE *fp = fopen(arguments.sql, "r"); if (fp == NULL) { printf("failed to open file %s\n", arguments.sql); @@ -290,6 +292,10 @@ void readData() { } printf("open file %s success\n", arguments.sql); + void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); + if (taos == NULL) + taos_error(taos); + char *line = NULL; size_t len = 0; while (!feof(fp)) { @@ -325,9 +331,36 @@ void readData() { int64_t elapsed = getTimeStampMs() - st; float seconds = (float)elapsed / 1000; - printf("---- Spent %f seconds to query: %s", seconds, line); + printf("---- Spent %f seconds to retrieve %d records, Thread:%d query: %s\n", seconds, rows, pThread->threadId, line); } fclose(fp); } +void readData() { + printf("read data\n"); + printf("---- sql: %s\n", arguments.sql); + printf("---- clients: %d\n", arguments.clients); + + void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); + if (taos == NULL) + taos_error(taos); + + ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj)); + + for (int i = 0; i < arguments.clients; ++i) { + ThreadObj *pthread = threads + i; + pthread_attr_t thattr; + pthread->threadId = i + 1; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pthread_create(&pthread->pid, &thattr, (void *(*)(void *))readDataImp, pthread); + } + + for (int i = 0; i < arguments.clients; i++) { + pthread_join(threads[i].pid, NULL); + } + + free(threads); +} + diff --git a/tests/perftest-scripts/tdengineTestQ5Loop.sh b/tests/perftest-scripts/tdengineTestQ5Loop.sh new file mode 100755 index 0000000000000000000000000000000000000000..9eff74f563fdd21fddd885324096daef26170f94 --- /dev/null +++ b/tests/perftest-scripts/tdengineTestQ5Loop.sh @@ -0,0 +1,107 @@ +#!/bin/bash + +DATA_DIR=/mnt/root/testdata +NUM_LOOP=5 + +function printTo { + if $verbose ; then + echo $1 + fi +} + +TDTESTQ5OUT=tdengineTestQ5.out + +function runTest { + totalThroughput=0 + for i in `seq 1 $NUM_LOOP`; do + for c in `seq 1 $clients`; do + records[$c]=0 + spentTime[$c]=0 + throughput[$c]=0 + done + printTo "loop i:$i, $TDTEST_DIR/tdengineTest \ + -clients $clients -sql q5.txt" + restartTaosd + beginMS=`date +%s%3N` + $TDTEST_DIR/tdengineTest \ + -clients $clients -sql $TDTEST_DIR/q5.txt > $TDTESTQ5OUT + endMS=`date +%s%3N` + totalRecords=0 + for c in `seq 1 $clients`; do + records[$c]=`grep Thread:$c $TDTESTQ5OUT | awk '{print $7}'` + totalRecords=`echo "$totalRecords + ${records[$c]}"|bc` + done + spending=`echo "scale=4; x = ($endMS - $beginMS)/1000; if (x<1) print 0; x"|bc` + throughput=`echo "scale=4; x= $totalRecords / $spending; if (x<1) print 0; x" | bc` + printTo "spending: $spending sec, throughput: $throughput" + totalThroughput=`echo "scale=4; x = $totalThroughput + $throughput; if(x<1) print 0; x"|bc` + done + avgThrougput=`echo "scale=4; x = $totalThroughput / $NUM_LOOP; if (x<1) print 0; x"|bc` + echo "avg Throughput: $avgThrougput" +} + +function restartTaosd { + printTo "Stop taosd" + systemctl stop taosd + PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` + while [ -n "$PID" ] + do + pkill -TERM -x taosd + sleep 1 + PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` + done + + printTo "Start taosd" + $TAOSD_DIR/taosd > /dev/null 2>&1 & + sleep 10 +} + +################ Main ################ + +master=false +develop=true +verbose=false + +clients=1 + +while : ; do + case $1 in + -v) + verbose=true + shift ;; + + master) + master=true + develop=false + shift ;; + + develop) + master=false + develop=true + shift ;; + + -c) + clients=$2 + shift 2;; + + *) + break ;; + esac +done + +if $master ; then + printTo "Test master branch.." + cp /mnt/root/cfg/master/taos.cfg /etc/taos/taos.cfg + WORK_DIR=/mnt/root/TDengine.master +else + printTo "Test develop branch.." + cp /mnt/root/cfg/10billion/taos.cfg /etc/taos/taos.cfg + WORK_DIR=/mnt/root/TDengine +fi + +TAOSD_DIR=$WORK_DIR/debug/build/bin +TDTEST_DIR=$WORK_DIR/tests/comparisonTest/tdengine + +runTest + +printTo "Test done!" diff --git a/tests/perftest-scripts/tdengineTestWriteLoop.sh b/tests/perftest-scripts/tdengineTestWriteLoop.sh index 9f54bb6fc3e04144e9e21ce90ffcdc3fd222863d..4cbb7916780316f186417f49ef86499a52c25834 100755 --- a/tests/perftest-scripts/tdengineTestWriteLoop.sh +++ b/tests/perftest-scripts/tdengineTestWriteLoop.sh @@ -5,7 +5,6 @@ NUM_LOOP=5 NUM_OF_FILES=100 rowsPerRequest=(1 100 500 1000 2000) -numOfClients=(1 2 3 4 5 6 7) function printTo { if $verbose ; then @@ -15,7 +14,7 @@ function printTo { function runTest { printf "R/R, " - for c in ${numOfClients[@]}; do + for c in `seq 1 $clients`; do if [ "$c" == "1" ]; then printf "$c client, " else @@ -26,7 +25,7 @@ function runTest { for r in ${rowsPerRequest[@]}; do printf "$r, " - for c in ${numOfClients[@]}; do + for c in `seq 1 $clients`; do totalRPR=0 for i in `seq 1 $NUM_LOOP`; do restartTaosd @@ -34,12 +33,12 @@ function runTest { printTo "loop i:$i, $TDTEST_DIR/tdengineTest \ -dataDir $DATA_DIR \ -numOfFiles $NUM_OF_FILES \ - -writeClients $c \ + -w -clients $c \ -rowsPerRequest $r" RPR=`$TDTEST_DIR/tdengineTest \ -dataDir $DATA_DIR \ -numOfFiles 1 \ - -writeClients $c \ + -w -clients $c \ -rowsPerRequest $r \ | grep speed | awk '{print $(NF-1)}'` totalRPR=`echo "scale=4; $totalRPR + $RPR" | bc` @@ -73,25 +72,29 @@ function restartTaosd { master=false develop=true verbose=false +clients=1 -for arg in "$@" -do - case $arg in +while : ; do + case $1 in -v) verbose=true - ;; + shift ;; master) master=true develop=false - ;; + shift ;; develop) master=false develop=true - ;; + shift ;; + + -c) + clients=$2 + shift 2;; *) - ;; + break ;; esac done