提交 49ce73d0 编写于 作者: S slguan

TBASE-1452 #1136

上级 3df6b4ab
...@@ -96,15 +96,11 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, ...@@ -96,15 +96,11 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port,
pObj = (SConnCache *)handle; pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL; if (pObj == NULL || pObj->maxSessions == 0) return NULL;
#ifdef CLUSTER
if (data == NULL || ip == 0) {
#else
if (data == NULL) { if (data == NULL) {
#endif
tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port); tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port);
return NULL; return NULL;
} }
hash = taosHashConn(pObj, ip, port, user); hash = taosHashConn(pObj, ip, port, user);
pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool); pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool);
pNode->ip = ip; pNode->ip = ip;
......
...@@ -31,15 +31,9 @@ ...@@ -31,15 +31,9 @@
#define TSC_MGMT_VNODE 999 #define TSC_MGMT_VNODE 999
#ifdef CLUSTER SIpStrList tscMgmtIpList;
SIpStrList tscMgmtIpList; int tsMasterIndex = 0;
int tsMasterIndex = 0; int tsSlaveIndex = 1;
int tsSlaveIndex = 1;
#else
int tsMasterIndex = 0;
int tsSlaveIndex = 0; // slave == master for single node edition
uint32_t tsServerIp;
#endif
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql);
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
...@@ -49,15 +43,15 @@ int tscKeepConn[TSDB_SQL_MAX] = {0}; ...@@ -49,15 +43,15 @@ int tscKeepConn[TSDB_SQL_MAX] = {0};
static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); } static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); }
#ifdef CLUSTER
void tscPrintMgmtIp() { void tscPrintMgmtIp() {
if (tscMgmtIpList.numOfIps <= 0) { if (tscMgmtIpList.numOfIps <= 0) {
tscError("invalid IP list:%d", tscMgmtIpList.numOfIps); tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
} else { } else {
for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]); for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]);
}
} }
} }
#endif
/* /*
* For each management node, try twice at least in case of poor network situation. * For each management node, try twice at least in case of poor network situation.
...@@ -68,11 +62,7 @@ void tscPrintMgmtIp() { ...@@ -68,11 +62,7 @@ void tscPrintMgmtIp() {
*/ */
static int32_t tscGetMgmtConnMaxRetryTimes() { static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2; int32_t factor = 2;
#ifdef CLUSTER
return tscMgmtIpList.numOfIps * factor; return tscMgmtIpList.numOfIps * factor;
#else
return 1*factor;
#endif
} }
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
...@@ -88,18 +78,30 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -88,18 +78,30 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (code == 0) { if (code == 0) {
SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
#ifdef CLUSTER
SIpList * pIpList = &pRsp->ipList; SIpList * pIpList = &pRsp->ipList;
tscMgmtIpList.numOfIps = pIpList->numOfIps; if (pIpList->numOfIps != 0) {
if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { //heart beat from cluster edition
for (int i = 0; i < pIpList->numOfIps; ++i) { tscMgmtIpList.numOfIps = pIpList->numOfIps;
tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]); if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) {
tscMgmtIpList.ip[i] = pIpList->ip[i]; for (int i = 0; i < pIpList->numOfIps; ++i) {
tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
tscMgmtIpList.ip[i] = pIpList->ip[i];
}
tscTrace("new mgmt IP list:");
tscPrintMgmtIp();
}
} else {
//heart beat from edge edition
if (tscMgmtIpList.numOfIps != 2) {
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscTrace("mgmt IP list:");
tscPrintMgmtIp();
} }
tscTrace("new mgmt IP list:");
tscPrintMgmtIp();
} }
#endif
if (pRsp->killConnection) { if (pRsp->killConnection) {
tscKillConnection(pObj); tscKillConnection(pObj);
} else { } else {
...@@ -152,19 +154,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -152,19 +154,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj; STscObj *pTscObj = pSql->pTscObj;
#ifdef CLUSTER
if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) { if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
*pCode = 0; *pCode = 0;
pSql->retry++; pSql->retry++;
pSql->index = pSql->index % tscMgmtIpList.numOfIps; pSql->index = pSql->index % tscMgmtIpList.numOfIps;
if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1; if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user); void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
#else
if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
*pCode = 0;
pSql->retry++;
void *thandle = taosGetConnFromCache(tscConnCache, tsServerIp, TSC_MGMT_VNODE, pTscObj->user);
#endif
if (thandle == NULL) { if (thandle == NULL) {
SRpcConnInit connInit; SRpcConnInit connInit;
...@@ -180,24 +175,15 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { ...@@ -180,24 +175,15 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
connInit.encrypt = 0; connInit.encrypt = 0;
connInit.secret = pSql->pTscObj->pass; connInit.secret = pSql->pTscObj->pass;
#ifdef CLUSTER
connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
#else
connInit.peerIp = tsMasterIp;
#endif
thandle = taosOpenRpcConn(&connInit, pCode); thandle = taosOpenRpcConn(&connInit, pCode);
} }
pSql->thandle = thandle; pSql->thandle = thandle;
#ifdef CLUSTER
pSql->ip = tscMgmtIpList.ip[pSql->index]; pSql->ip = tscMgmtIpList.ip[pSql->index];
pSql->vnode = TSC_MGMT_VNODE; pSql->vnode = TSC_MGMT_VNODE;
tscTrace("%p mgmt index:%d ip:0x%x is picked up, pConn:%p", pSql, pSql->index, tscMgmtIpList.ip[pSql->index], tscTrace("%p mgmt index:%d ip:0x%x is picked up, pConn:%p", pSql, pSql->index, tscMgmtIpList.ip[pSql->index],
pSql->thandle); pSql->thandle);
#else
pSql->ip = tsServerIp;
pSql->vnode = TSC_MGMT_VNODE;
#endif
} }
// the pSql->res.code is the previous error(status) code. // the pSql->res.code is the previous error(status) code.
...@@ -242,11 +228,15 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { ...@@ -242,11 +228,15 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
while (pSql->retry < pSql->maxRetry) { while (pSql->retry < pSql->maxRetry) {
(pSql->retry)++; (pSql->retry)++;
#ifdef CLUSTER
char ipstr[40] = {0}; char ipstr[40] = {0};
if (pVPeersDesc[pSql->index].ip == 0) { if (pVPeersDesc[pSql->index].ip == 0) {
(pSql->index) = (pSql->index + 1) % TSDB_VNODES_SUPPORT; /*
continue; * Only the stand-alone version, ip is 0, at this time we use mastrIp
*/
//(pSql->index) = (pSql->index + 1) % TSDB_VNODES_SUPPORT;
//continue;
pVPeersDesc[pSql->index].ip = tscMgmtIpList.ip[0];
} }
*pCode = TSDB_CODE_SUCCESS; *pCode = TSDB_CODE_SUCCESS;
...@@ -276,31 +266,6 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { ...@@ -276,31 +266,6 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
pSql->vnode = pVPeersDesc[pSql->index].vnode; pSql->vnode = pVPeersDesc[pSql->index].vnode;
tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode, tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);
#else
*pCode = 0;
void *thandle = taosGetConnFromCache(tscConnCache, tsServerIp, pVPeersDesc[0].vnode, pTscObj->user);
if (thandle == NULL) {
SRpcConnInit connInit;
memset(&connInit, 0, sizeof(connInit));
connInit.cid = vidIndex;
connInit.sid = 0;
connInit.spi = 0;
connInit.encrypt = 0;
connInit.meterId = pSql->pTscObj->user;
connInit.peerId = htonl((pVPeersDesc[0].vnode << TSDB_SHELL_VNODE_BITS));
connInit.shandle = pVnodeConn;
connInit.ahandle = pSql;
connInit.peerIp = tsMasterIp;
connInit.peerPort = tsVnodeShellPort;
thandle = taosOpenRpcConn(&connInit, pCode);
vidIndex = (vidIndex + 1) % tscNumOfThreads;
}
pSql->thandle = thandle;
pSql->ip = tsServerIp;
pSql->vnode = pVPeersDesc[0].vnode;
#endif
break; break;
} }
...@@ -367,14 +332,25 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -367,14 +332,25 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return code; return code;
} }
#ifdef CLUSTER
void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) { void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) {
SIpList *pIpList = (SIpList *)(cont); SIpList *pIpList = (SIpList *)(cont);
tscMgmtIpList.numOfIps = pIpList->numOfIps; tscMgmtIpList.numOfIps = pIpList->numOfIps;
for (int i = 0; i < pIpList->numOfIps; ++i) {
tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]); if (pIpList->numOfIps != 0) {
tscMgmtIpList.ip[i] = pIpList->ip[i]; for (int i = 0; i < pIpList->numOfIps; ++i) {
tscTrace("Update mgmt Ip, index:%d ip:%s", i, tscMgmtIpList.ipstr[i]); tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
tscMgmtIpList.ip[i] = pIpList->ip[i];
tscTrace("Update mgmt IP, index:%d ip:%s", i, tscMgmtIpList.ipstr[i]);
}
} else {
if (tscMgmtIpList.numOfIps != 2) {
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscTrace("Update mgmt IP list:");
tscPrintMgmtIp();
}
} }
if (pSql->cmd.command < TSDB_SQL_READ) { if (pSql->cmd.command < TSDB_SQL_READ) {
...@@ -386,7 +362,6 @@ void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) { ...@@ -386,7 +362,6 @@ void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) {
tscPrintMgmtIp(); tscPrintMgmtIp();
} }
#endif
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
if (ahandle == NULL) return NULL; if (ahandle == NULL) return NULL;
...@@ -421,12 +396,8 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -421,12 +396,8 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (msg == NULL) { if (msg == NULL) {
tscTrace("%p no response from ip:0x%x", pSql, pSql->ip); tscTrace("%p no response from ip:0x%x", pSql, pSql->ip);
#ifdef CLUSTER
pSql->index++; pSql->index++;
#else
// for single node situation, do NOT try next index
#endif
pSql->thandle = NULL; pSql->thandle = NULL;
// todo taos_stop_query() in async model // todo taos_stop_query() in async model
/* /*
...@@ -442,12 +413,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -442,12 +413,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
// renew meter meta in case it is changed // renew meter meta in case it is changed
if (pCmd->command < TSDB_SQL_FETCH && pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pCmd->command < TSDB_SQL_FETCH && pRes->code != TSDB_CODE_QUERY_CANCELLED) {
#ifdef CLUSTER
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
#else
// for fetch, it shall not renew meter meta
pSql->maxRetry = 2;
#endif
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
pRes->code = code; pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;
...@@ -460,8 +426,6 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -460,8 +426,6 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} else { } else {
uint16_t rspCode = pMsg->content[0]; uint16_t rspCode = pMsg->content[0];
#ifdef CLUSTER
if (rspCode == TSDB_CODE_REDIRECT) { if (rspCode == TSDB_CODE_REDIRECT) {
tscTrace("%p it shall be redirected!", pSql); tscTrace("%p it shall be redirected!", pSql);
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
...@@ -493,12 +457,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -493,12 +457,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
* removed. So, renew metermeta and try again. * removed. So, renew metermeta and try again.
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore. * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
*/ */
#else pSql->thandle = NULL;
if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || rspCode == TSDB_CODE_INVALID_VNODE_ID ||
rspCode == TSDB_CODE_TABLE_ID_MISMATCH || rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#endif
pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if (pCmd->command == TSDB_SQL_CONNECT) { if (pCmd->command == TSDB_SQL_CONNECT) {
...@@ -767,12 +726,8 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -767,12 +726,8 @@ int tscProcessSql(SSqlObj *pSql) {
tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pSql->cmd.command, name, pSql->cmd.type); tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pSql->cmd.command, name, pSql->cmd.type);
pSql->retry = 0; pSql->retry = 0;
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
#ifdef CLUSTER
pSql->maxRetry = TSDB_VNODES_SUPPORT; pSql->maxRetry = TSDB_VNODES_SUPPORT;
#else
pSql->maxRetry = 2;
#endif
// the pMeterMetaInfo cannot be NULL // the pMeterMetaInfo cannot be NULL
if (pMeterMetaInfo == NULL) { if (pMeterMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_OTHERS; pSql->res.code = TSDB_CODE_OTHERS;
...@@ -3435,20 +3390,29 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -3435,20 +3390,29 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= tListLen(pObj->db)); assert(len <= tListLen(pObj->db));
strncpy(pObj->db, temp, tListLen(pObj->db)); strncpy(pObj->db, temp, tListLen(pObj->db));
#ifdef CLUSTER
SIpList * pIpList; SIpList * pIpList;
char *rsp = pRes->pRsp + sizeof(SConnectRsp); char *rsp = pRes->pRsp + sizeof(SConnectRsp);
pIpList = (SIpList *)rsp; pIpList = (SIpList *)rsp;
tscMgmtIpList.numOfIps = pIpList->numOfIps;
for (int i = 0; i < pIpList->numOfIps; ++i) { if (pIpList->numOfIps != 0) {
tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]); tscMgmtIpList.numOfIps = pIpList->numOfIps;
tscMgmtIpList.ip[i] = pIpList->ip[i]; for (int i = 0; i < pIpList->numOfIps; ++i) {
tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
tscMgmtIpList.ip[i] = pIpList->ip[i];
}
} else {
if (tscMgmtIpList.numOfIps != 2) {
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
}
} }
rsp += sizeof(SIpList) + sizeof(int32_t) * pIpList->numOfIps; rsp += sizeof(SIpList) + sizeof(int32_t) * pIpList->numOfIps;
tscPrintMgmtIp(); tscPrintMgmtIp();
#endif
strcpy(pObj->sversion, pConnect->version); strcpy(pObj->sversion, pConnect->version);
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth; pObj->superAuth = pConnect->superAuth;
......
...@@ -63,19 +63,10 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -63,19 +63,10 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
} }
} }
#ifdef CLUSTER
if (ip && ip[0]) { if (ip && ip[0]) {
strcpy(tscMgmtIpList.ipstr[1], ip); strcpy(tscMgmtIpList.ipstr[0], ip);
tscMgmtIpList.ip[1] = inet_addr(ip); tscMgmtIpList.ip[0] = inet_addr(ip);
} }
#else
if (ip && ip[0]) {
if (ip != tsMasterIp) {
strcpy(tsMasterIp, ip);
}
tsServerIp = inet_addr(ip);
}
#endif
pObj = (STscObj *)malloc(sizeof(STscObj)); pObj = (STscObj *)malloc(sizeof(STscObj));
if (NULL == pObj) { if (NULL == pObj) {
...@@ -175,11 +166,6 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -175,11 +166,6 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) { void *param, void **taos) {
#ifndef CLUSTER
if (ip == NULL) {
ip = tsMasterIp;
}
#endif
return taos_connect_imp(ip, user, pass, db, port, fp, param, taos); return taos_connect_imp(ip, user, pass, db, port, fp, param, taos);
} }
......
...@@ -95,7 +95,6 @@ void taos_init_imp() { ...@@ -95,7 +95,6 @@ void taos_init_imp() {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
} }
#ifdef CLUSTER
tscMgmtIpList.numOfIps = 2; tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
...@@ -108,7 +107,6 @@ void taos_init_imp() { ...@@ -108,7 +107,6 @@ void taos_init_imp() {
strcpy(tscMgmtIpList.ipstr[2], tsSecondIp); strcpy(tscMgmtIpList.ipstr[2], tsSecondIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
} }
#endif
tscInitMsgs(); tscInitMsgs();
slaveIndex = rand(); slaveIndex = rand();
......
...@@ -54,6 +54,7 @@ extern char tsDirectory[]; ...@@ -54,6 +54,7 @@ extern char tsDirectory[];
extern char dataDir[]; extern char dataDir[];
extern char logDir[]; extern char logDir[];
extern char scriptDir[]; extern char scriptDir[];
extern char osName[];
extern char tsMasterIp[]; extern char tsMasterIp[];
extern char tsSecondIp[]; extern char tsSecondIp[];
...@@ -78,7 +79,6 @@ extern char tsPrivateIp[]; ...@@ -78,7 +79,6 @@ extern char tsPrivateIp[];
extern short tsNumOfVnodesPerCore; extern short tsNumOfVnodesPerCore;
extern short tsNumOfTotalVnodes; extern short tsNumOfTotalVnodes;
extern short tsCheckHeaderFile; extern short tsCheckHeaderFile;
extern uint32_t tsServerIp;
extern uint32_t tsPublicIpInt; extern uint32_t tsPublicIpInt;
extern int tsSessionsPerVnode; extern int tsSessionsPerVnode;
......
...@@ -21,28 +21,13 @@ ...@@ -21,28 +21,13 @@
#include "shellCommand.h" #include "shellCommand.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
#include "taoserror.h"
#include <regex.h> #include <regex.h>
/**************** Global variables ****************/ /**************** Global variables ****************/
#ifdef WINDOWS char CLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
char CLIENT_VERSION[] = "Welcome to the TDengine shell from windows, client version:%s "; "Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n";
#elif defined(DARWIN)
char CLIENT_VERSION[] = "Welcome to the TDengine shell from mac, client version:%s ";
#else
#ifdef CLUSTER
char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, enterprise client version:%s ";
#else
char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, community client version:%s ";
#endif
#endif
#ifdef CLUSTER
char SERVER_VERSION[] = "enterprise server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n";
#else
char SERVER_VERSION[] = "community server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n";
#endif
char PROMPT_HEADER[] = "taos> "; char PROMPT_HEADER[] = "taos> ";
char CONTINUE_PROMPT[] = " -> "; char CONTINUE_PROMPT[] = " -> ";
int prompt_size = 6; int prompt_size = 6;
...@@ -54,7 +39,7 @@ History history; ...@@ -54,7 +39,7 @@ History history;
*/ */
TAOS *shellInit(struct arguments *args) { TAOS *shellInit(struct arguments *args) {
printf("\n"); printf("\n");
printf(CLIENT_VERSION, taos_get_client_info()); printf(CLIENT_VERSION, osName, taos_get_client_info());
fflush(stdout); fflush(stdout);
// set options before initializing // set options before initializing
...@@ -119,8 +104,6 @@ TAOS *shellInit(struct arguments *args) { ...@@ -119,8 +104,6 @@ TAOS *shellInit(struct arguments *args) {
} }
#endif #endif
printf(SERVER_VERSION, taos_get_server_info(con));
return con; return con;
} }
...@@ -817,11 +800,16 @@ void source_file(TAOS *con, char *fptr) { ...@@ -817,11 +800,16 @@ void source_file(TAOS *con, char *fptr) {
} }
void shellGetGrantInfo(void *con) { void shellGetGrantInfo(void *con) {
#ifdef CLUSTER
char sql[] = "show grants"; char sql[] = "show grants";
if (taos_query(con, sql)) { int code = taos_query(con, sql);
fprintf(stdout, "\n");
if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_OPS_NOT_SUPPORT) {
fprintf(stdout, "Server is Community Edition, version is %s.\n\n", taos_get_server_info(con));
} else {
fprintf(stderr, "Failed to check Server Edition, Reason:%d:%s\n\n", taos_errno(con), taos_errstr(con));
}
return; return;
} }
...@@ -843,18 +831,18 @@ void shellGetGrantInfo(void *con) { ...@@ -843,18 +831,18 @@ void shellGetGrantInfo(void *con) {
exit(0); exit(0);
} }
char version[32] = {0}; char serverVersion[32] = {0};
char expiretime[32] = {0}; char expiretime[32] = {0};
char expired[32] = {0}; char expired[32] = {0};
memcpy(version, row[0], fields[0].bytes); memcpy(serverVersion, row[0], fields[0].bytes);
memcpy(expiretime, row[1], fields[1].bytes); memcpy(expiretime, row[1], fields[1].bytes);
memcpy(expired, row[2], fields[2].bytes); memcpy(expired, row[2], fields[2].bytes);
if (strcmp(expiretime, "unlimited") == 0) { if (strcmp(expiretime, "unlimited") == 0) {
fprintf(stdout, "This is the %s version and will never expire.\n", version); fprintf(stdout, "Server is Enterprise %s Edition, version is %s and will never expire.\n", serverVersion, taos_get_server_info(con));
} else { } else {
fprintf(stdout, "This is the %s version and will expire at %s.\n", version, expiretime); fprintf(stdout, "Server is Enterprise %s Edition, version is %s and will expire at %s.\n", serverVersion, taos_get_server_info(con), expiretime);
} }
taos_free_result(result); taos_free_result(result);
...@@ -862,5 +850,4 @@ void shellGetGrantInfo(void *con) { ...@@ -862,5 +850,4 @@ void shellGetGrantInfo(void *con) {
} }
fprintf(stdout, "\n"); fprintf(stdout, "\n");
#endif
} }
...@@ -119,13 +119,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -119,13 +119,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = {options, parse_opt, args_doc, doc}; static struct argp argp = {options, parse_opt, args_doc, doc};
void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { void shellParseArgument(int argc, char *argv[], struct arguments *arguments) {
char verType[32] = {0}; static char verType[32] = {0};
#ifdef CLUSTER sprintf(verType, "version: %s\n", version);
sprintf(verType, "enterprise version: %s\n", version);
#else
sprintf(verType, "community version: %s\n", version);
#endif
argp_program_version = verType; argp_program_version = verType;
argp_parse(&argp, argc, argv, 0, 0, arguments); argp_parse(&argp, argc, argv, 0, 0, arguments);
......
...@@ -217,9 +217,7 @@ void monitorInitDatabaseCb(void *param, TAOS_RES *result, int code) { ...@@ -217,9 +217,7 @@ void monitorInitDatabaseCb(void *param, TAOS_RES *result, int code) {
if (monitor->cmdIndex == MONITOR_CMD_CREATE_TB_LOG) { if (monitor->cmdIndex == MONITOR_CMD_CREATE_TB_LOG) {
taosLogFp = monitorSaveLog; taosLogFp = monitorSaveLog;
taosLogSqlFp = monitorExecuteSQL; taosLogSqlFp = monitorExecuteSQL;
#ifdef CLUSTER
taosLogAcctFp = monitorSaveAcctLog; taosLogAcctFp = monitorSaveAcctLog;
#endif
monitorLPrint("dnode:%s is started", tsPrivateIp); monitorLPrint("dnode:%s is started", tsPrivateIp);
} }
monitor->cmdIndex++; monitor->cmdIndex++;
......
...@@ -38,6 +38,7 @@ char tsDirectory[TSDB_FILENAME_LEN] = "~/TDengine/data"; ...@@ -38,6 +38,7 @@ char tsDirectory[TSDB_FILENAME_LEN] = "~/TDengine/data";
char dataDir[TSDB_FILENAME_LEN] = "~/TDengine/data"; char dataDir[TSDB_FILENAME_LEN] = "~/TDengine/data";
char logDir[TSDB_FILENAME_LEN] = "~/TDengine/log"; char logDir[TSDB_FILENAME_LEN] = "~/TDengine/log";
char scriptDir[TSDB_FILENAME_LEN] = "~/TDengine/script"; char scriptDir[TSDB_FILENAME_LEN] = "~/TDengine/script";
char osName[] = "Darwin";
int64_t str2int64(char *str) { int64_t str2int64(char *str) {
char *endptr = NULL; char *endptr = NULL;
......
...@@ -39,6 +39,7 @@ char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos"; ...@@ -39,6 +39,7 @@ char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos";
char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
char logDir[TSDB_FILENAME_LEN] = "/var/log/taos"; char logDir[TSDB_FILENAME_LEN] = "/var/log/taos";
char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos"; char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos";
char osName[] = "Linux";
int64_t str2int64(char *str) { int64_t str2int64(char *str) {
char *endptr = NULL; char *endptr = NULL;
......
...@@ -37,6 +37,7 @@ char tsDirectory[TSDB_FILENAME_LEN] = "C:/TDengine/data"; ...@@ -37,6 +37,7 @@ char tsDirectory[TSDB_FILENAME_LEN] = "C:/TDengine/data";
char logDir[TSDB_FILENAME_LEN] = "C:/TDengine/log"; char logDir[TSDB_FILENAME_LEN] = "C:/TDengine/log";
char dataDir[TSDB_FILENAME_LEN] = "C:/TDengine/data"; char dataDir[TSDB_FILENAME_LEN] = "C:/TDengine/data";
char scriptDir[TSDB_FILENAME_LEN] = "C:/TDengine/script"; char scriptDir[TSDB_FILENAME_LEN] = "C:/TDengine/script";
char osName[] = "Windows";
bool taosCheckPthreadValid(pthread_t thread) { bool taosCheckPthreadValid(pthread_t thread) {
return thread.p != NULL; return thread.p != NULL;
......
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
extern char version[]; extern char version[];
const int16_t sdbFileVersion = 0; const int16_t sdbFileVersion = 0;
int sdbExtConns = 0; int sdbExtConns = 0;
SIpList *pSdbIpList = NULL;
SIpList *pSdbPublicIpList = NULL;
#ifdef CLUSTER #ifdef CLUSTER
int sdbMaster = 0; int sdbMaster = 0;
......
...@@ -1202,21 +1202,28 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { ...@@ -1202,21 +1202,28 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) {
pConn->streamId = 0; pConn->streamId = 0;
pHBRsp->killConnection = pConn->killConnection; pHBRsp->killConnection = pConn->killConnection;
#ifdef CLUSTER
if (pConn->usePublicIp) { if (pConn->usePublicIp) {
int size = pSdbPublicIpList->numOfIps * 4; if (pSdbPublicIpList != NULL) {
pHBRsp->ipList.numOfIps = pSdbPublicIpList->numOfIps; int size = pSdbPublicIpList->numOfIps * 4;
memcpy(pHBRsp->ipList.ip, pSdbPublicIpList->ip, size); pHBRsp->ipList.numOfIps = pSdbPublicIpList->numOfIps;
pMsg += sizeof(SHeartBeatRsp) + size; memcpy(pHBRsp->ipList.ip, pSdbPublicIpList->ip, size);
pMsg += sizeof(SHeartBeatRsp) + size;
} else {
pHBRsp->ipList.numOfIps = 0;
pMsg += sizeof(SHeartBeatRsp);
}
} else { } else {
int size = pSdbIpList->numOfIps * 4; if (pSdbIpList != NULL) {
pHBRsp->ipList.numOfIps = pSdbIpList->numOfIps; int size = pSdbIpList->numOfIps * 4;
memcpy(pHBRsp->ipList.ip, pSdbIpList->ip, size); pHBRsp->ipList.numOfIps = pSdbIpList->numOfIps;
pMsg += sizeof(SHeartBeatRsp) + size; memcpy(pHBRsp->ipList.ip, pSdbIpList->ip, size);
pMsg += sizeof(SHeartBeatRsp) + size;
} else {
pHBRsp->ipList.numOfIps = 0;
pMsg += sizeof(SHeartBeatRsp);
}
} }
#else
pMsg += sizeof(SHeartBeatRsp);
#endif
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
taosSendMsgToPeer(pConn->thandle, pStart, msgLen); taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
...@@ -1334,15 +1341,22 @@ _rsp: ...@@ -1334,15 +1341,22 @@ _rsp:
pConnectRsp->superAuth = pConn->superAuth; pConnectRsp->superAuth = pConn->superAuth;
pMsg += sizeof(SConnectRsp); pMsg += sizeof(SConnectRsp);
#ifdef CLUSTER int size;
int size = pSdbPublicIpList->numOfIps * 4 + sizeof(SIpList); if (pSdbPublicIpList != NULL && pSdbIpList != NULL) {
if (pConn->usePublicIp) { size = pSdbPublicIpList->numOfIps * 4 + sizeof(SIpList);
memcpy(pMsg, pSdbPublicIpList, size); if (pConn->usePublicIp) {
memcpy(pMsg, pSdbPublicIpList, size);
} else {
memcpy(pMsg, pSdbIpList, size);
}
} else { } else {
memcpy(pMsg, pSdbIpList, size); SIpList tmpIpList;
tmpIpList.numOfIps = 0;
size = tmpIpList.numOfIps * 4 + sizeof(SIpList);
memcpy(pMsg, &tmpIpList, size);
} }
pMsg += size; pMsg += size;
#endif
// set the time resolution: millisecond or microsecond // set the time resolution: millisecond or microsecond
*((uint32_t *)pMsg) = tsTimePrecision; *((uint32_t *)pMsg) = tsTimePrecision;
......
...@@ -56,11 +56,7 @@ int tscEmbedded = 0; ...@@ -56,11 +56,7 @@ int tscEmbedded = 0;
*/ */
int64_t tsMsPerDay[] = {86400000L, 86400000000L}; int64_t tsMsPerDay[] = {86400000L, 86400000000L};
#ifdef CLUSTER
char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0}; char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0};
#else
char tsMasterIp[TSDB_IPv4ADDR_LEN] = "127.0.0.1";
#endif
char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0}; char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0};
uint16_t tsMgmtShellPort = 6030; // udp[6030-6034] tcp[6030] uint16_t tsMgmtShellPort = 6030; // udp[6030-6034] tcp[6030]
uint16_t tsVnodeShellPort = 6035; // udp[6035-6039] tcp[6035] uint16_t tsVnodeShellPort = 6035; // udp[6035-6039] tcp[6035]
...@@ -444,7 +440,7 @@ static void doInitGlobalConfig() { ...@@ -444,7 +440,7 @@ static void doInitGlobalConfig() {
// ip address // ip address
tsInitConfigOption(cfg++, "masterIp", tsMasterIp, TSDB_CFG_VTYPE_IPSTR, tsInitConfigOption(cfg++, "masterIp", tsMasterIp, TSDB_CFG_VTYPE_IPSTR,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_CLUSTER, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT,
0, 0, TSDB_IPv4ADDR_LEN, TSDB_CFG_UTYPE_NONE); 0, 0, TSDB_IPv4ADDR_LEN, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "secondIp", tsSecondIp, TSDB_CFG_VTYPE_IPSTR, tsInitConfigOption(cfg++, "secondIp", tsSecondIp, TSDB_CFG_VTYPE_IPSTR,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_CLUSTER, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_CLUSTER,
...@@ -787,11 +783,9 @@ static void doInitGlobalConfig() { ...@@ -787,11 +783,9 @@ static void doInitGlobalConfig() {
TSDB_CFG_CTYPE_B_CONFIG, TSDB_CFG_CTYPE_B_CONFIG,
0, 1, 0, TSDB_CFG_UTYPE_NONE); 0, 1, 0, TSDB_CFG_UTYPE_NONE);
#ifdef CLUSTER
tsInitConfigOption(cfg++, "anyIp", &tsAnyIp, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "anyIp", &tsAnyIp, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLUSTER,
0, 1, 0, TSDB_CFG_UTYPE_NONE); 0, 1, 0, TSDB_CFG_UTYPE_NONE);
#endif
// version info // version info
tsInitConfigOption(cfg++, "gitinfo", gitinfo, TSDB_CFG_VTYPE_STRING, tsInitConfigOption(cfg++, "gitinfo", gitinfo, TSDB_CFG_VTYPE_STRING,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册