提交 c877418d 编写于 作者: dengyihao's avatar dengyihao

support multi thread update mnode and dnode

上级 f8818e5b
...@@ -457,7 +457,7 @@ extern void * tscQhandle; ...@@ -457,7 +457,7 @@ extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
extern int tsInsertHeadSize; extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern SRpcIpSet tscMgmtIpSet; extern SRpcCorIpSet tscMgmtIpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#define TSC_MGMT_VNODE 999 #define TSC_MGMT_VNODE 999
SRpcIpSet tscMgmtIpSet; SRpcCorIpSet tscMgmtIpSet;
SRpcIpSet tscDnodeIpSet; SRpcIpSet tscDnodeIpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
...@@ -44,6 +44,42 @@ void tscSaveSubscriptionProgress(void* sub); ...@@ -44,6 +44,42 @@ void tscSaveSubscriptionProgress(void* sub);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) {
taosCorBeginRead(&tscMgmtIpSet.version);
*ipSet = tscMgmtIpSet.ipSet;
taosCorEndRead(&tscMgmtIpSet.version);
}
bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) {
if (s1->numOfIps != s2->numOfIps
|| s1->inUse != s1->inUse) {
return false;
}
for (int32_t i = 0; i < s1->numOfIps; i++) {
if (s1->port[i] != s2->port[i]
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
return false;
}
return true;
}
void tscSetMgmtIpList(SRpcIpSet *pIpSet) {
// no need to update if equal
SRpcIpSet dump;
tscDumpMgmtIpSet(&dump);
if (tscIpSetIsEqual(&dump, pIpSet)) {
return;
}
taosCorBeginWrite(&tscMgmtIpSet.version);
SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet;
mgmtIpSet->numOfIps = pIpSet->numOfIps;
mgmtIpSet->inUse = pIpSet->inUse;
for (int32_t i = 0; i < mgmtIpSet->numOfIps; ++i) {
mgmtIpSet->port[i] = htons(pIpSet->port[i]);
strncpy(mgmtIpSet->fqdn[i], pIpSet->fqdn[i], TSDB_FQDN_LEN);
}
taosCorEndWrite(&tscMgmtIpSet.version);
}
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
SRpcIpSet* pIpList = &pSql->ipList; SRpcIpSet* pIpList = &pSql->ipList;
pIpList->inUse = 0; pIpList->inUse = 0;
...@@ -60,31 +96,17 @@ static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { ...@@ -60,31 +96,17 @@ static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
} }
void tscPrintMgmtIp() { void tscPrintMgmtIp() {
if (tscMgmtIpSet.numOfIps <= 0) { SRpcIpSet dump;
tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps); tscDumpMgmtIpSet(&dump);
if (dump.numOfIps <= 0) {
tscError("invalid mnode IP list:%d", dump.numOfIps);
} else { } else {
for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) { for (int i = 0; i < dump.numOfIps; ++i) {
tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
} }
} }
} }
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
tscMgmtIpSet.numOfIps = pIpList->numOfIps;
tscMgmtIpSet.inUse = pIpList->inUse;
for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
}
}
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
tscMgmtIpSet = *pIpSet;
tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
}
}
/* /*
* For each management node, try twice at least in case of poor network situation. * For each management node, try twice at least in case of poor network situation.
* If the client start to connect to a non-management node from the client, and the first retry may fail due to * If the client start to connect to a non-management node from the client, and the first retry may fail due to
...@@ -95,7 +117,9 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { ...@@ -95,7 +117,9 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
UNUSED_FUNC UNUSED_FUNC
static int32_t tscGetMgmtConnMaxRetryTimes() { static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2; int32_t factor = 2;
return tscMgmtIpSet.numOfIps * factor; SRpcIpSet dump;
tscDumpMgmtIpSet(&dump);
return dump.numOfIps * factor;
} }
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
...@@ -185,7 +209,9 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -185,7 +209,9 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list // set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) { if (pSql->cmd.command >= TSDB_SQL_MGMT) {
pSql->ipList = tscMgmtIpSet; SRpcIpSet dump;
tscDumpMgmtIpSet(&dump);
pSql->ipList = dump;
} }
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
...@@ -239,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -239,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
if (pCmd->command < TSDB_SQL_MGMT) { if (pCmd->command < TSDB_SQL_MGMT) {
if (pIpSet) pSql->ipList = *pIpSet; if (pIpSet) pSql->ipList = *pIpSet;
} else { } else {
if (pIpSet) tscMgmtIpSet = *pIpSet; if (pIpSet) tscSetMgmtIpList(pIpSet);
} }
if (rpcMsg->pCont == NULL) { if (rpcMsg->pCont == NULL) {
...@@ -421,7 +447,9 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -421,7 +447,9 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code; return pSql->res.code;
} }
} else if (pCmd->command < TSDB_SQL_LOCAL) { } else if (pCmd->command < TSDB_SQL_LOCAL) {
pSql->ipList = tscMgmtIpSet; SRpcIpSet dump;
tscDumpMgmtIpSet(&dump);
pSql->ipList = dump;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
......
...@@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
if (ip) { if (ip) {
if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL; if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL;
if (port) tscMgmtIpSet.port[0] = port; if (port) tscMgmtIpSet.ipSet.port[0] = port;
} }
void *pDnodeConn = NULL; void *pDnodeConn = NULL;
......
...@@ -41,7 +41,8 @@ int tscNumOfThreads; ...@@ -41,7 +41,8 @@ int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); //void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();
......
...@@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) { ...@@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) {
} }
int tscSetMgmtIpListFromCfg(const char *first, const char *second) { int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
tscMgmtIpSet.numOfIps = 0; // init mgmt ip set
tscMgmtIpSet.inUse = 0; tscMgmtIpSet.version = 0;
SRpcIpSet *mgmtIpSet = &(tscMgmtIpSet.ipSet);
mgmtIpSet->numOfIps = 0;
mgmtIpSet->inUse = 0;
if (first && first[0] != 0) { if (first && first[0] != 0) {
if (strlen(first) >= TSDB_EP_LEN) { if (strlen(first) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); taosGetFqdnPortFromEp(first, mgmtIpSet->fqdn[mgmtIpSet->numOfIps], &(mgmtIpSet->port[mgmtIpSet->numOfIps]));
tscMgmtIpSet.numOfIps++; mgmtIpSet->numOfIps++;
} }
if (second && second[0] != 0) { if (second && second[0] != 0) {
...@@ -2163,11 +2166,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { ...@@ -2163,11 +2166,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); taosGetFqdnPortFromEp(second, mgmtIpSet->fqdn[mgmtIpSet->numOfIps], &(mgmtIpSet->port[mgmtIpSet->numOfIps]));
tscMgmtIpSet.numOfIps++; mgmtIpSet->numOfIps++;
} }
if ( tscMgmtIpSet.numOfIps == 0) { if (mgmtIpSet->numOfIps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
......
...@@ -35,6 +35,11 @@ typedef struct SRpcIpSet { ...@@ -35,6 +35,11 @@ typedef struct SRpcIpSet {
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcIpSet; } SRpcIpSet;
typedef struct SRpcCorIpSet {
int32_t version;
SRpcIpSet ipSet;
} SRpcCorIpSet;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
uint32_t clientIp; uint32_t clientIp;
uint16_t clientPort; uint16_t clientPort;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册