提交 dad3b9f1 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

add a new definition foe label length

上级 6f29506e
...@@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_COUNTRY_LEN 20 #define TSDB_COUNTRY_LEN 20
#define TSDB_LOCALE_LEN 64 #define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 64 #define TSDB_TIMEZONE_LEN 64
#define TSDB_LABEL_LEN 8
#define TSDB_FQDN_LEN 128 #define TSDB_FQDN_LEN 128
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6) #define TSDB_EP_LEN (TSDB_FQDN_LEN+6)
......
...@@ -47,7 +47,7 @@ typedef struct { ...@@ -47,7 +47,7 @@ typedef struct {
uint16_t localPort; uint16_t localPort;
int8_t connType; int8_t connType;
int index; // for UDP server only, round robin for multiple threads int index; // for UDP server only, round robin for multiple threads
char label[12]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
...@@ -88,7 +88,7 @@ typedef struct { ...@@ -88,7 +88,7 @@ typedef struct {
} SRpcReqContext; } SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
char info[50];// debug info: label + pConn + ahandle char info[48];// debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "os.h" #include "os.h"
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
#include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcHead.h" #include "rpcHead.h"
...@@ -46,7 +47,7 @@ typedef struct SThreadObj { ...@@ -46,7 +47,7 @@ typedef struct SThreadObj {
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
char label[12]; char label[TSDB_LABEL_LEN];
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pPacket); void *(*processData)(SRecvInfo *pPacket);
} SThreadObj; } SThreadObj;
...@@ -55,7 +56,7 @@ typedef struct { ...@@ -55,7 +56,7 @@ typedef struct {
int fd; int fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char label[12]; char label[TSDB_LABEL_LEN];
int numOfThreads; int numOfThreads;
void * shandle; void * shandle;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
...@@ -79,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -79,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return NULL; return NULL;
} }
pServerObj->thread = 0;
pServerObj->ip = ip; pServerObj->ip = ip;
pServerObj->port = port; pServerObj->port = port;
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
...@@ -93,8 +95,14 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -93,8 +95,14 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
int code = 0; int code = 0;
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pThreadObj = pServerObj->pThreadObj; pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj->pollFd = -1;
pThreadObj->thread = 0;
pThreadObj->processData = fp; pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
...@@ -114,11 +122,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -114,11 +122,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break; break;
} }
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -130,11 +134,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -130,11 +134,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
if (code == 0) { if (code == 0) {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
...@@ -142,38 +142,39 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -142,38 +142,39 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
if (code != 0) { if (code != 0) {
free(pServerObj->pThreadObj); taosCleanUpTcpServer(pServerObj);
free(pServerObj);
pServerObj = NULL; pServerObj = NULL;
} else { } else {
tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads); tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
} }
pthread_attr_destroy(&thattr);
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) { static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true; pThreadObj->stop = true;
eventfd_t fd = -1;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed if (pThreadObj->thread && pThreadObj->pollFd >=0) {
struct epoll_event event = { .events = EPOLLIN }; // signal the thread to stop, try graceful method first,
eventfd_t fd = eventfd(1, 0); // and use pthread_cancel when failed
if (fd == -1) { struct epoll_event event = { .events = EPOLLIN };
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption: fd = eventfd(1, 0);
tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno)); if (fd == -1) {
pthread_cancel(pThreadObj->thread); // failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption: pthread_cancel(pThreadObj->thread);
tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno)); } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
pthread_cancel(pThreadObj->thread); // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
pthread_cancel(pThreadObj->thread);
}
} }
pthread_join(pThreadObj->thread, NULL); if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL);
close(pThreadObj->pollFd); if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd);
if (fd != -1) { if (fd != -1) close(fd);
close(fd);
}
while (pThreadObj->pHead) { while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead; SFdObj *pFdObj = pThreadObj->pHead;
...@@ -188,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -188,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) {
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->thread) pthread_join(pServerObj->thread, NULL);
pthread_join(pServerObj->thread, NULL);
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "tsystem.h" #include "tsystem.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "taosdef.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcUdp.h" #include "rpcUdp.h"
#include "rpcHead.h" #include "rpcHead.h"
...@@ -33,7 +34,7 @@ typedef struct { ...@@ -33,7 +34,7 @@ typedef struct {
int fd; int fd;
uint16_t port; // peer port uint16_t port; // peer port
uint16_t localPort; // local port uint16_t localPort; // local port
char label[12]; // copy from udpConnSet; char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread; pthread_t thread;
void *hash; void *hash;
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
...@@ -49,7 +50,7 @@ typedef struct { ...@@ -49,7 +50,7 @@ typedef struct {
uint16_t port; // local Port uint16_t port; // local Port
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
int threads; int threads;
char label[12]; char label[TSDB_LABEL_LEN];
void *(*fp)(SRecvInfo *pPacket); void *(*fp)(SRecvInfo *pPacket);
SUdpConn udpConn[]; SUdpConn udpConn[];
} SUdpConnSet; } SUdpConnSet;
...@@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads ...@@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
} }
struct sockaddr_in sin; struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) { addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port); pConn->localPort = (uint16_t)ntohs(sin.sin_port);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "os.h" #include "os.h"
#include "taosdef.h"
#include "tulog.h" #include "tulog.h"
#include "tsched.h" #include "tsched.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -21,7 +22,7 @@ ...@@ -21,7 +22,7 @@
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. #define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
typedef struct { typedef struct {
char label[16]; char label[TSDB_LABEL_LEN];
tsem_t emptySem; tsem_t emptySem;
tsem_t fullSem; tsem_t fullSem;
pthread_mutex_t queueMutex; pthread_mutex_t queueMutex;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册