From c877418df00f36c1621a6ccc2e2adf65e0457611 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 00:31:23 +0000 Subject: [PATCH] support multi thread update mnode and dnode --- src/client/inc/tsclient.h | 2 +- src/client/src/tscServer.c | 78 ++++++++++++++++++++++++++------------ src/client/src/tscSql.c | 2 +- src/client/src/tscSystem.c | 3 +- src/client/src/tscUtil.c | 17 +++++---- src/inc/trpc.h | 5 +++ 6 files changed, 72 insertions(+), 35 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index be82eb64a8..e1180c4143 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -457,7 +457,7 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SRpcIpSet tscMgmtIpSet; +extern SRpcCorIpSet tscMgmtIpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d8af6d5c87..e8a03a6482 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -29,7 +29,7 @@ #define TSC_MGMT_VNODE 999 -SRpcIpSet tscMgmtIpSet; +SRpcCorIpSet tscMgmtIpSet; SRpcIpSet tscDnodeIpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -44,6 +44,42 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } +static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { + taosCorBeginRead(&tscMgmtIpSet.version); + *ipSet = tscMgmtIpSet.ipSet; + taosCorEndRead(&tscMgmtIpSet.version); +} + +bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { + if (s1->numOfIps != s2->numOfIps + || s1->inUse != s1->inUse) { + return false; + } + for (int32_t i = 0; i < s1->numOfIps; i++) { + if (s1->port[i] != s2->port[i] + || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) + return false; + } + return true; +} +void tscSetMgmtIpList(SRpcIpSet *pIpSet) { + // no need to update if equal + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + if (tscIpSetIsEqual(&dump, pIpSet)) { + return; + } + + taosCorBeginWrite(&tscMgmtIpSet.version); + SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet; + mgmtIpSet->numOfIps = pIpSet->numOfIps; + mgmtIpSet->inUse = pIpSet->inUse; + for (int32_t i = 0; i < mgmtIpSet->numOfIps; ++i) { + mgmtIpSet->port[i] = htons(pIpSet->port[i]); + strncpy(mgmtIpSet->fqdn[i], pIpSet->fqdn[i], TSDB_FQDN_LEN); + } + taosCorEndWrite(&tscMgmtIpSet.version); +} static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { SRpcIpSet* pIpList = &pSql->ipList; pIpList->inUse = 0; @@ -60,31 +96,17 @@ static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { } void tscPrintMgmtIp() { - if (tscMgmtIpSet.numOfIps <= 0) { - tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps); + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + if (dump.numOfIps <= 0) { + tscError("invalid mnode IP list:%d", dump.numOfIps); } else { - for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); + for (int i = 0; i < dump.numOfIps; ++i) { + tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); } } } -void tscSetMgmtIpList(SRpcIpSet *pIpList) { - tscMgmtIpSet.numOfIps = pIpList->numOfIps; - tscMgmtIpSet.inUse = pIpList->inUse; - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscMgmtIpSet.port[i] = htons(pIpList->port[i]); - } -} - -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { - tscMgmtIpSet = *pIpSet; - tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse); - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); - } -} - /* * For each management node, try twice at least in case of poor network situation. * If the client start to connect to a non-management node from the client, and the first retry may fail due to @@ -95,7 +117,9 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { UNUSED_FUNC static int32_t tscGetMgmtConnMaxRetryTimes() { int32_t factor = 2; - return tscMgmtIpSet.numOfIps * factor; + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + return dump.numOfIps * factor; } void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { @@ -185,7 +209,9 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - pSql->ipList = tscMgmtIpSet; + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + pSql->ipList = dump; } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -239,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { if (pCmd->command < TSDB_SQL_MGMT) { if (pIpSet) pSql->ipList = *pIpSet; } else { - if (pIpSet) tscMgmtIpSet = *pIpSet; + if (pIpSet) tscSetMgmtIpList(pIpSet); } if (rpcMsg->pCont == NULL) { @@ -421,7 +447,9 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - pSql->ipList = tscMgmtIpSet; + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + pSql->ipList = dump; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0677463d8d..b24cfd9b27 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con if (ip) { if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL; - if (port) tscMgmtIpSet.port[0] = port; + if (port) tscMgmtIpSet.ipSet.port[0] = port; } void *pDnodeConn = NULL; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 82cc8cc225..1a78ed87bd 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -41,7 +41,8 @@ int tscNumOfThreads; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); +//void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); + void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosGetDisk(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 3e0fe0b4be..b0e6c727eb 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) { } int tscSetMgmtIpListFromCfg(const char *first, const char *second) { - tscMgmtIpSet.numOfIps = 0; - tscMgmtIpSet.inUse = 0; + // init mgmt ip set + tscMgmtIpSet.version = 0; + SRpcIpSet *mgmtIpSet = &(tscMgmtIpSet.ipSet); + mgmtIpSet->numOfIps = 0; + mgmtIpSet->inUse = 0; if (first && first[0] != 0) { if (strlen(first) >= TSDB_EP_LEN) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(first, mgmtIpSet->fqdn[mgmtIpSet->numOfIps], &(mgmtIpSet->port[mgmtIpSet->numOfIps])); + mgmtIpSet->numOfIps++; } if (second && second[0] != 0) { @@ -2163,11 +2166,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(second, mgmtIpSet->fqdn[mgmtIpSet->numOfIps], &(mgmtIpSet->port[mgmtIpSet->numOfIps])); + mgmtIpSet->numOfIps++; } - if ( tscMgmtIpSet.numOfIps == 0) { + if (mgmtIpSet->numOfIps == 0) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index d1adfb7494..b159155e9d 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -35,6 +35,11 @@ typedef struct SRpcIpSet { char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SRpcIpSet; +typedef struct SRpcCorIpSet { + int32_t version; + SRpcIpSet ipSet; +} SRpcCorIpSet; + typedef struct SRpcConnInfo { uint32_t clientIp; uint16_t clientPort; -- GitLab