未验证 提交 606dc06d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2172 from taosdata/hotfix/rpcClose

make rpcTcp multi-thread safe
...@@ -113,6 +113,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { ...@@ -113,6 +113,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
} }
int32_t dnodeInitClient() { int32_t dnodeInitClient() {
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
...@@ -123,7 +124,7 @@ int32_t dnodeInitClient() { ...@@ -123,7 +124,7 @@ int32_t dnodeInitClient() {
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "t"; rpcInit.user = "t";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.secret = "secret"; rpcInit.secret = secret;
tsDnodeClientRpc = rpcOpen(&rpcInit); tsDnodeClientRpc = rpcOpen(&rpcInit);
if (tsDnodeClientRpc == NULL) { if (tsDnodeClientRpc == NULL) {
......
...@@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_COUNTRY_LEN 20 #define TSDB_COUNTRY_LEN 20
#define TSDB_LOCALE_LEN 64 #define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 64 #define TSDB_TIMEZONE_LEN 64
#define TSDB_LABEL_LEN 8
#define TSDB_FQDN_LEN 128 #define TSDB_FQDN_LEN 128
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6) #define TSDB_EP_LEN (TSDB_FQDN_LEN+6)
......
...@@ -47,7 +47,7 @@ typedef struct { ...@@ -47,7 +47,7 @@ typedef struct {
uint16_t localPort; uint16_t localPort;
int8_t connType; int8_t connType;
int index; // for UDP server only, round robin for multiple threads int index; // for UDP server only, round robin for multiple threads
char label[12]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
...@@ -88,7 +88,7 @@ typedef struct { ...@@ -88,7 +88,7 @@ typedef struct {
} SRpcReqContext; } SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
char info[50];// debug info: label + pConn + ahandle char info[48];// debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
...@@ -805,16 +805,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -805,16 +805,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pConn == NULL) { if (pConn == NULL) {
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
return NULL; return NULL;
} else { }
if (rpcIsReq(pHead->msgType)) {
pConn->ahandle = (void *)pHead->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
}
}
rpcLockConn(pConn); rpcLockConn(pConn);
sid = pConn->sid;
if (rpcIsReq(pHead->msgType)) {
pConn->ahandle = (void *)pHead->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
}
sid = pConn->sid;
pConn->chandle = pRecv->chandle; pConn->chandle = pRecv->chandle;
pConn->peerIp = pRecv->ip; pConn->peerIp = pRecv->ip;
pConn->peerPort = pRecv->port; pConn->peerPort = pRecv->port;
...@@ -847,10 +847,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -847,10 +847,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
static void rpcProcessBrokenLink(SRpcConn *pConn) { static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn == NULL) return;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s, link is broken", pConn->info); tTrace("%s, link is broken", pConn->info);
// pConn->chandle = NULL;
rpcLockConn(pConn);
if (pConn->outType) { if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
...@@ -871,7 +872,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -871,7 +872,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
(*(pRpc->cfp))(&rpcMsg); (*(pRpc->cfp))(&rpcMsg);
*/ */
} }
rpcUnlockConn(pConn);
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
...@@ -885,7 +887,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -885,7 +887,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
// underlying UDP layer does not know it is server or client // underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType; pRecv->connType = pRecv->connType | pRpc->connType;
if (pRecv->ip == 0 && pConn) { if (pRecv->ip == 0) {
rpcProcessBrokenLink(pConn); rpcProcessBrokenLink(pConn);
return NULL; return NULL;
} }
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include "os.h" #include "os.h"
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
#include "taosdef.h"
#include "taoserror.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcTcp.h" #include "rpcTcp.h"
...@@ -26,8 +28,9 @@ ...@@ -26,8 +28,9 @@
typedef struct SFdObj { typedef struct SFdObj {
void *signature; void *signature;
int fd; // TCP socket FD int fd; // TCP socket FD
void *thandle; // handle from upper layer, like TAOS int closedByApp; // 1: already closed by App
void *thandle; // handle from upper layer, like TAOS
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
struct SThreadObj *pThreadObj; struct SThreadObj *pThreadObj;
...@@ -44,7 +47,7 @@ typedef struct SThreadObj { ...@@ -44,7 +47,7 @@ typedef struct SThreadObj {
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
char label[12]; char label[TSDB_LABEL_LEN];
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pPacket); void *(*processData)(SRecvInfo *pPacket);
} SThreadObj; } SThreadObj;
...@@ -53,7 +56,7 @@ typedef struct { ...@@ -53,7 +56,7 @@ typedef struct {
int fd; int fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char label[12]; char label[TSDB_LABEL_LEN];
int numOfThreads; int numOfThreads;
void * shandle; void * shandle;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
...@@ -71,6 +74,13 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -71,6 +74,13 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
if (pServerObj == NULL) {
tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
pServerObj->thread = 0;
pServerObj->ip = ip; pServerObj->ip = ip;
pServerObj->port = port; pServerObj->port = port;
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
...@@ -79,13 +89,20 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -79,13 +89,20 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
if (pServerObj->pThreadObj == NULL) { if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
free(pServerObj); free(pServerObj);
return NULL; return NULL;
} }
int code = 0; int code = 0;
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pThreadObj = pServerObj->pThreadObj; pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj->pollFd = -1;
pThreadObj->thread = 0;
pThreadObj->processData = fp; pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
...@@ -93,23 +110,22 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -93,23 +110,22 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
code = pthread_mutex_init(&(pThreadObj->mutex), NULL); code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) { if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;; break;;
} }
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1; code = -1;
break; break;
} }
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
...@@ -118,47 +134,47 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -118,47 +134,47 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
if (code == 0) { if (code == 0) {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
} }
} }
if (code != 0) { if (code != 0) {
free(pServerObj->pThreadObj); taosCleanUpTcpServer(pServerObj);
free(pServerObj);
pServerObj = NULL; pServerObj = NULL;
} else { } else {
tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads); tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
} }
pthread_attr_destroy(&thattr);
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) { static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true; pThreadObj->stop = true;
eventfd_t fd = -1;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed if (pThreadObj->thread && pThreadObj->pollFd >=0) {
struct epoll_event event = { .events = EPOLLIN }; // signal the thread to stop, try graceful method first,
eventfd_t fd = eventfd(1, 0); // and use pthread_cancel when failed
if (fd == -1) { struct epoll_event event = { .events = EPOLLIN };
tError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno)); fd = eventfd(1, 0);
pthread_cancel(pThreadObj->thread); if (fd == -1) {
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { // failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno)); tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
pthread_cancel(pThreadObj->thread); pthread_cancel(pThreadObj->thread);
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
pthread_cancel(pThreadObj->thread);
}
} }
pthread_join(pThreadObj->thread, NULL); if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL);
close(pThreadObj->pollFd); if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd);
if (fd != -1) { if (fd != -1) close(fd);
close(fd);
}
while (pThreadObj->pHead) { while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead; SFdObj *pFdObj = pThreadObj->pHead;
...@@ -173,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -173,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) {
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->thread) pthread_join(pServerObj->thread, NULL);
pthread_join(pServerObj->thread, NULL);
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
...@@ -211,6 +226,7 @@ static void* taosAcceptTcpConnection(void *arg) { ...@@ -211,6 +226,7 @@ static void* taosAcceptTcpConnection(void *arg) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
break; break;
} }
tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno)); tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
continue; continue;
} }
...@@ -254,6 +270,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * ...@@ -254,6 +270,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
free(pThreadObj); free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
...@@ -261,6 +278,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * ...@@ -261,6 +278,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label); tError("%s failed to create TCP client epoll", label);
free(pThreadObj); free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
...@@ -273,6 +291,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * ...@@ -273,6 +291,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if (code != 0) { if (code != 0) {
close(pThreadObj->pollFd); close(pThreadObj->pollFd);
free(pThreadObj); free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
return NULL; return NULL;
} }
...@@ -287,7 +306,7 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -287,7 +306,7 @@ void taosCleanUpTcpClient(void *chandle) {
if (pThreadObj == NULL) return; if (pThreadObj == NULL) return;
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
tTrace (":%s, all connections are cleaned up", pThreadObj->label); tTrace ("%s, all connections are cleaned up", pThreadObj->label);
tfree(pThreadObj); tfree(pThreadObj);
} }
...@@ -318,7 +337,9 @@ void taosCloseTcpConnection(void *chandle) { ...@@ -318,7 +337,9 @@ void taosCloseTcpConnection(void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
taosFreeFdObj(pFdObj); pFdObj->thandle = NULL;
pFdObj->closedByApp = 1;
shutdown(pFdObj->fd, SHUT_WR);
} }
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
...@@ -334,7 +355,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { ...@@ -334,7 +355,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
// notify the upper layer, so it will clean the associated context // notify the upper layer, so it will clean the associated context
if (pFdObj->thandle) { if (pFdObj->closedByApp == 0) {
shutdown(pFdObj->fd, SHUT_WR);
SRecvInfo recvInfo; SRecvInfo recvInfo;
recvInfo.msg = NULL; recvInfo.msg = NULL;
recvInfo.msgLen = 0; recvInfo.msgLen = 0;
...@@ -345,9 +368,59 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { ...@@ -345,9 +368,59 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
recvInfo.chandle = NULL; recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP; recvInfo.connType = RPC_CONN_TCP;
(*(pThreadObj->processData))(&recvInfo); (*(pThreadObj->processData))(&recvInfo);
} else { }
taosFreeFdObj(pFdObj);
taosFreeFdObj(pFdObj);
}
static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
SRpcHead rpcHead;
int32_t msgLen, leftLen, retLen, headLen;
char *buffer, *msg;
SThreadObj *pThreadObj = pFdObj->pThreadObj;
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
return -1;
}
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) {
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1;
} }
msg = buffer + tsRpcOverhead;
leftLen = msgLen - headLen;
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p, read error, leftLen:%d retLen:%d",
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
free(buffer);
return -1;
}
memcpy(msg, &rpcHead, sizeof(SRpcHead));
pInfo->msg = msg;
pInfo->msgLen = msgLen;
pInfo->ip = pFdObj->ip;
pInfo->port = pFdObj->port;
pInfo->shandle = pThreadObj->shandle;
pInfo->thandle = pFdObj->thandle;;
pInfo->chandle = pFdObj;
pInfo->connType = RPC_CONN_TCP;
if (pFdObj->closedByApp) {
free(buffer);
return -1;
}
return 0;
} }
#define maxEvents 10 #define maxEvents 10
...@@ -357,7 +430,6 @@ static void *taosProcessTcpData(void *param) { ...@@ -357,7 +430,6 @@ static void *taosProcessTcpData(void *param) {
SFdObj *pFdObj; SFdObj *pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
SRpcHead rpcHead;
while (1) { while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
...@@ -376,51 +448,23 @@ static void *taosProcessTcpData(void *param) { ...@@ -376,51 +448,23 @@ static void *taosProcessTcpData(void *param) {
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLRDHUP) {
tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle); tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle);
taosReportBrokenLink(pFdObj);
continue;
}
int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); if (events[i].events & EPOLLHUP) {
char *buffer = malloc(msgLen + tsRpcOverhead); tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
if ( NULL == buffer) {
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
char *msg = buffer + tsRpcOverhead; if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
int32_t leftLen = msgLen - headLen; shutdown(pFdObj->fd, SHUT_WR);
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p, read error, leftLen:%d retLen:%d",
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
taosReportBrokenLink(pFdObj);
tfree(buffer);
continue; continue;
} }
// tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
} }
...@@ -433,16 +477,20 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) { ...@@ -433,16 +477,20 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
struct epoll_event event; struct epoll_event event;
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1); SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
if (pFdObj == NULL) return NULL; if (pFdObj == NULL) {
return NULL;
}
pFdObj->closedByApp = 0;
pFdObj->fd = fd; pFdObj->fd = fd;
pFdObj->pThreadObj = pThreadObj; pFdObj->pThreadObj = pThreadObj;
pFdObj->signature = pFdObj; pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = pFdObj; event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tfree(pFdObj); tfree(pFdObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
...@@ -475,13 +523,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -475,13 +523,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
taosCloseSocket(pFdObj->fd); taosCloseSocket(pFdObj->fd);
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p, TCP thread:%d, number of FDs is negative!!!", tError("%s %p, TCP thread:%d, number of FDs is negative!!!",
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
// remove from the FdObject list
if (pFdObj->prev) { if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next; (pFdObj->prev)->next = pFdObj->next;
} else { } else {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "tsystem.h" #include "tsystem.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "taosdef.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcUdp.h" #include "rpcUdp.h"
#include "rpcHead.h" #include "rpcHead.h"
...@@ -33,7 +34,7 @@ typedef struct { ...@@ -33,7 +34,7 @@ typedef struct {
int fd; int fd;
uint16_t port; // peer port uint16_t port; // peer port
uint16_t localPort; // local port uint16_t localPort; // local port
char label[12]; // copy from udpConnSet; char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread; pthread_t thread;
void *hash; void *hash;
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
...@@ -49,7 +50,7 @@ typedef struct { ...@@ -49,7 +50,7 @@ typedef struct {
uint16_t port; // local Port uint16_t port; // local Port
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
int threads; int threads;
char label[12]; char label[TSDB_LABEL_LEN];
void *(*fp)(SRecvInfo *pPacket); void *(*fp)(SRecvInfo *pPacket);
SUdpConn udpConn[]; SUdpConn udpConn[];
} SUdpConnSet; } SUdpConnSet;
...@@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads ...@@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
} }
struct sockaddr_in sin; struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) { addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port); pConn->localPort = (uint16_t)ntohs(sin.sin_port);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "os.h" #include "os.h"
#include "taosdef.h"
#include "tulog.h" #include "tulog.h"
#include "tsched.h" #include "tsched.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -21,7 +22,7 @@ ...@@ -21,7 +22,7 @@
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. #define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
typedef struct { typedef struct {
char label[16]; char label[TSDB_LABEL_LEN];
tsem_t emptySem; tsem_t emptySem;
tsem_t fullSem; tsem_t fullSem;
pthread_mutex_t queueMutex; pthread_mutex_t queueMutex;
......
...@@ -40,9 +40,27 @@ int taosGetFqdn(char *fqdn) { ...@@ -40,9 +40,27 @@ int taosGetFqdn(char *fqdn) {
} }
uint32_t taosGetIpFromFqdn(const char *fqdn) { uint32_t taosGetIpFromFqdn(const char *fqdn) {
struct hostent * record = gethostbyname(fqdn); struct addrinfo hints, *servinfo, *p;
if(record == NULL) return -1; struct sockaddr_in *h;
return ((struct in_addr *)record->h_addr)->s_addr; uint32_t ip = -1;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(fqdn, "http", &hints, &servinfo) != 0) {
uError("failed to get IP from %s(%s)", fqdn, strerror(errno));
return -1;
}
// to do: loop through all the results and connect to the first we can
for(p = servinfo; p != NULL; p = p->ai_next) {
h = (struct sockaddr_in *) p->ai_addr;
ip = h->sin_addr.s_addr;
}
freeaddrinfo(servinfo); // all done with this structure
return ip;
} }
// Function converting an IP address string to an unsigned int. // Function converting an IP address string to an unsigned int.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册