提交 a368b7b4 编写于 作者: S Shengliang Guan

TD-1207 change int32_t to socket

上级 df96fd1b
...@@ -27,23 +27,23 @@ ...@@ -27,23 +27,23 @@
extern "C" { extern "C" {
#endif #endif
#define STR_TO_VARSTR(x, str) \ #define STR_TO_VARSTR(x, str) \
do { \ do { \
VarDataLenT __len = (int32_t)strlen(str); \ VarDataLenT __len = (int32_t)strlen(str); \
*(VarDataLenT *)(x) = __len; \ *(VarDataLenT *)(x) = __len; \
memcpy(varDataVal(x), (str), __len); \ memcpy(varDataVal(x), (str), __len); \
} while (0); } while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \ do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \ char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \
varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \ varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \
} while (0) } while (0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ #define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \
do { \ do { \
*(VarDataLenT *)(x) = (int32_t)(_size); \ *(VarDataLenT *)(x) = (int32_t)(_size); \
memcpy(varDataVal(x), (str), (_size)); \ memcpy(varDataVal(x), (str), (_size)); \
} while (0); } while (0);
// ----------------- TSDB COLUMN DEFINITION // ----------------- TSDB COLUMN DEFINITION
...@@ -156,7 +156,7 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { ...@@ -156,7 +156,7 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
* | len | sversion | First part | Second part | * | len | sversion | First part | Second part |
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
* *
* NOTE: timestamp in this row structure is TKEY instead of TSKEY * NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/ */
typedef void *SDataRow; typedef void *SDataRow;
......
...@@ -29,11 +29,11 @@ typedef struct { ...@@ -29,11 +29,11 @@ typedef struct {
static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}}; static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}};
int64_t tsMinFreeMemSizeForStart = 0; int64_t tsMinFreeMemSizeForStart = 0;
static int32_t bindTcpPort(int32_t port) { static int32_t bindTcpPort(int16_t port) {
int32_t serverSocket; SOCKET serverSocket;
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
if ((serverSocket = ( int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
dError("socket() fail: %s", strerror(errno)); dError("socket() fail: %s", strerror(errno));
return -1; return -1;
} }
...@@ -59,11 +59,11 @@ static int32_t bindTcpPort(int32_t port) { ...@@ -59,11 +59,11 @@ static int32_t bindTcpPort(int32_t port) {
return 0; return 0;
} }
static int32_t bindUdpPort(int32_t port) { static int32_t bindUdpPort(int16_t port) {
int32_t serverSocket; SOCKET serverSocket;
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
if ((serverSocket = (int32_t)socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
dError("socket() fail: %s", strerror(errno)); dError("socket() fail: %s", strerror(errno));
return -1; return -1;
} }
...@@ -85,9 +85,9 @@ static int32_t bindUdpPort(int32_t port) { ...@@ -85,9 +85,9 @@ static int32_t bindUdpPort(int32_t port) {
static int32_t dnodeCheckNetwork() { static int32_t dnodeCheckNetwork() {
int32_t ret; int32_t ret;
int32_t startPort = tsServerPort; int16_t startPort = tsServerPort;
for (int32_t port = startPort; port < startPort + 12; port++) { for (int16_t port = startPort; port < startPort + 12; port++) {
ret = bindTcpPort(port); ret = bindTcpPort(port);
if (0 != ret) { if (0 != ret) {
dError("failed to tcp bind port %d, quit", port); dError("failed to tcp bind port %d, quit", port);
......
...@@ -200,7 +200,7 @@ static void sendTelemetryReport() { ...@@ -200,7 +200,7 @@ static void sendTelemetryReport() {
dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno));
return; return;
} }
int32_t fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
if (fd < 0) { if (fd < 0) {
dTrace("failed to create socket for telemetry, reason:%s", strerror(errno)); dTrace("failed to create socket for telemetry, reason:%s", strerror(errno));
return; return;
......
...@@ -39,7 +39,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP ...@@ -39,7 +39,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP
} }
// TAOS_OS_FUNC_FILE_SENDIFLE // TAOS_OS_FUNC_FILE_SENDIFLE
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size);
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size); int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
#ifdef TAOS_RANDOM_FILE_FAIL #ifdef TAOS_RANDOM_FILE_FAIL
......
...@@ -37,6 +37,9 @@ extern "C" { ...@@ -37,6 +37,9 @@ extern "C" {
#ifndef TAOS_OS_DEF_EPOLL #ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME 500 #define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) taosCloseSocket(pollFd)
#endif #endif
#ifdef TAOS_RANDOM_NETWORK_FAIL #ifdef TAOS_RANDOM_NETWORK_FAIL
...@@ -57,13 +60,13 @@ extern "C" { ...@@ -57,13 +60,13 @@ extern "C" {
#endif #endif
// TAOS_OS_FUNC_SOCKET // TAOS_OS_FUNC_SOCKET
int32_t taosSetNonblocking(int32_t sock, int32_t on); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE(); void taosIgnSIGPIPE();
void taosBlockSIGPIPE(); void taosBlockSIGPIPE();
void taosSetMaskSIGPIPE(); void taosSetMaskSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int32_t taosSetSockOpt(int32_t socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
// TAOS_OS_FUNC_SOCKET_INET // TAOS_OS_FUNC_SOCKET_INET
uint32_t taosInetAddr(char *ipAddr); uint32_t taosInetAddr(char *ipAddr);
......
...@@ -93,6 +93,12 @@ typedef SOCKET eventfd_t; ...@@ -93,6 +93,12 @@ typedef SOCKET eventfd_t;
#define TAOS_OS_DEF_EPOLL #define TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME 100 #define TAOS_EPOLL_WAIT_TIME 100
typedef SOCKET EpollFd;
#define EpollClose(pollFd) epoll_close(pollFd)
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
#define TAOS_OS_DEF_ZU #define TAOS_OS_DEF_ZU
#define PRIzu "ld" #define PRIzu "ld"
......
...@@ -51,7 +51,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co ...@@ -51,7 +51,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
return writeLen; return writeLen;
} }
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t count) { int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t* offset, int64_t count) {
lseek(sfd, (int32_t)(*offset), 0); lseek(sfd, (int32_t)(*offset), 0);
int64_t writeLen = 0; int64_t writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = { 0 }; uint8_t buffer[_SEND_FILE_STEP_] = { 0 };
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
int taosSetSockOpt(int32_t socketfd, int level, int optname, void *optval, int optlen) { int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
if (level == SOL_SOCKET && optname == SO_SNDBUF) { if (level == SOL_SOCKET && optname == SO_SNDBUF) {
return 0; return 0;
} }
......
...@@ -121,7 +121,7 @@ int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { ...@@ -121,7 +121,7 @@ int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE #ifndef TAOS_OS_FUNC_FILE_SENDIFLE
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) { int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size) {
int64_t leftbytes = size; int64_t leftbytes = size;
int64_t sentbytes; int64_t sentbytes;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifndef TAOS_OS_FUNC_SOCKET #ifndef TAOS_OS_FUNC_SOCKET
int32_t taosSetNonblocking(int32_t sock, int32_t on) { int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
int32_t flags = 0; int32_t flags = 0;
if ((flags = fcntl(sock, F_GETFL, 0)) < 0) { if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno)); uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
...@@ -67,7 +67,7 @@ void taosSetMaskSIGPIPE() { ...@@ -67,7 +67,7 @@ void taosSetMaskSIGPIPE() {
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT #ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int32_t taosSetSockOpt(int32_t socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
} }
......
...@@ -43,7 +43,6 @@ void osInit() { ...@@ -43,7 +43,6 @@ void osInit() {
char cmdline[1024]; char cmdline[1024];
char* taosGetCmdlineByPID(int pid) { char* taosGetCmdlineByPID(int pid) {
#if 0
sprintf(cmdline, "/proc/%d/cmdline", pid); sprintf(cmdline, "/proc/%d/cmdline", pid);
FILE* f = fopen(cmdline, "r"); FILE* f = fopen(cmdline, "r");
if (f) { if (f) {
...@@ -55,7 +54,4 @@ char* taosGetCmdlineByPID(int pid) { ...@@ -55,7 +54,4 @@ char* taosGetCmdlineByPID(int pid) {
fclose(f); fclose(f);
} }
return cmdline; return cmdline;
#else
return "";
#endif
} }
...@@ -76,7 +76,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co ...@@ -76,7 +76,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
return writeLen; return writeLen;
} }
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t count) { int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t count) {
if (offset != NULL) lseek(sfd, (int32_t)(*offset), 0); if (offset != NULL) lseek(sfd, (int32_t)(*offset), 0);
int64_t writeLen = 0; int64_t writeLen = 0;
......
...@@ -34,7 +34,7 @@ void taosWinSocketInit() { ...@@ -34,7 +34,7 @@ void taosWinSocketInit() {
} }
} }
int32_t taosSetNonblocking(int32_t sock, int32_t on) { int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
u_long mode; u_long mode;
if (on) { if (on) {
mode = 1; mode = 1;
...@@ -50,7 +50,7 @@ void taosIgnSIGPIPE() {} ...@@ -50,7 +50,7 @@ void taosIgnSIGPIPE() {}
void taosBlockSIGPIPE() {} void taosBlockSIGPIPE() {}
void taosSetMaskSIGPIPE() {} void taosSetMaskSIGPIPE() {}
int32_t taosSetSockOpt(int32_t socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) { if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
return 0; return 0;
} }
......
...@@ -66,7 +66,7 @@ bool taosGetSysMemory(float *memoryUsedMB) { ...@@ -66,7 +66,7 @@ bool taosGetSysMemory(float *memoryUsedMB) {
bool taosGetProcMemory(float *memoryUsedMB) { bool taosGetProcMemory(float *memoryUsedMB) {
unsigned bytes_used = 0; unsigned bytes_used = 0;
#if defined(_WIN32) && defined(_MSC_VER) #if defined(_WIN64) && defined(_MSC_VER)
PROCESS_MEMORY_COUNTERS pmc; PROCESS_MEMORY_COUNTERS pmc;
HANDLE cur_proc = GetCurrentProcess(); HANDLE cur_proc = GetCurrentProcess();
......
...@@ -22,7 +22,7 @@ bool httpInitContexts(); ...@@ -22,7 +22,7 @@ bool httpInitContexts();
void httpCleanupContexts(); void httpCleanupContexts();
const char *httpContextStateStr(HttpContextState state); const char *httpContextStateStr(HttpContextState state);
HttpContext *httpCreateContext(int32_t fd); HttpContext *httpCreateContext(SOCKET fd);
bool httpInitContext(HttpContext *pContext); bool httpInitContext(HttpContext *pContext);
HttpContext *httpGetContext(void * pContext); HttpContext *httpGetContext(void * pContext);
void httpReleaseContext(HttpContext *pContext, bool clearRes); void httpReleaseContext(HttpContext *pContext, bool clearRes);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef TDENGINE_HTTP_INT_H #ifndef TDENGINE_HTTP_INT_H
#define TDENGINE_HTTP_INT_H #define TDENGINE_HTTP_INT_H
#include "os.h"
#include <stdbool.h> #include <stdbool.h>
#include "pthread.h" #include "pthread.h"
#include "semaphore.h" #include "semaphore.h"
...@@ -140,7 +141,7 @@ typedef enum { ...@@ -140,7 +141,7 @@ typedef enum {
typedef struct HttpContext { typedef struct HttpContext {
int32_t refCount; int32_t refCount;
int32_t fd; SOCKET fd;
uint32_t accessTimes; uint32_t accessTimes;
uint32_t lastAccessTime; uint32_t lastAccessTime;
int32_t state; int32_t state;
...@@ -167,7 +168,7 @@ typedef struct HttpThread { ...@@ -167,7 +168,7 @@ typedef struct HttpThread {
HttpContext * pHead; HttpContext * pHead;
pthread_mutex_t threadMutex; pthread_mutex_t threadMutex;
bool stop; bool stop;
int32_t pollFd; EpollFd pollFd;
int32_t numOfContexts; int32_t numOfContexts;
int32_t threadId; int32_t threadId;
char label[HTTP_LABEL_SIZE]; char label[HTTP_LABEL_SIZE];
...@@ -180,7 +181,7 @@ typedef struct HttpServer { ...@@ -180,7 +181,7 @@ typedef struct HttpServer {
uint16_t serverPort; uint16_t serverPort;
int8_t stop; int8_t stop;
int8_t reserve; int8_t reserve;
int32_t fd; SOCKET fd;
int32_t numOfThreads; int32_t numOfThreads;
int32_t methodScannerLen; int32_t methodScannerLen;
int32_t requestNum; int32_t requestNum;
......
...@@ -35,14 +35,18 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { ...@@ -35,14 +35,18 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread; HttpThread *pThread = pContext->pThread;
if (pContext->fd >= 0) { if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
int32_t fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1); #ifdef WINDOWS
SOCKET fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1);
#else
SOCKET fd = atomic_val_compare_exchange_64(&pContext->fd, pContext->fd, -1);
#endif
taosCloseSocket(fd); taosCloseSocket(fd);
} }
} }
static void httpDestroyContext(void *data) { static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data; HttpContext *pContext = *(HttpContext **)data;
if (pContext->fd > 0) taosClose(pContext->fd); if (pContext->fd > 0) taosCloseSocket(pContext->fd);
HttpThread *pThread = pContext->pThread; HttpThread *pThread = pContext->pThread;
httpRemoveContextFromEpoll(pContext); httpRemoveContextFromEpoll(pContext);
...@@ -106,7 +110,7 @@ bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, Htt ...@@ -106,7 +110,7 @@ bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, Htt
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState); return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
} }
HttpContext *httpCreateContext(int32_t fd) { HttpContext *httpCreateContext(SOCKET fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext)); HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL; if (pContext == NULL) return NULL;
......
...@@ -50,7 +50,7 @@ static void httpStopThread(HttpThread *pThread) { ...@@ -50,7 +50,7 @@ static void httpStopThread(HttpThread *pThread) {
taosCloseSocket(fd); taosCloseSocket(fd);
} }
taosCloseSocket(pThread->pollFd); EpollClose(pThread->pollFd);
pthread_mutex_destroy(&(pThread->threadMutex)); pthread_mutex_destroy(&(pThread->threadMutex));
} }
...@@ -152,7 +152,7 @@ static void httpProcessHttpData(void *param) { ...@@ -152,7 +152,7 @@ static void httpProcessHttpData(void *param) {
} }
static void *httpAcceptHttpConnection(void *arg) { static void *httpAcceptHttpConnection(void *arg) {
int32_t connFd = -1; SOCKET connFd = -1;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
int32_t threadId = 0; int32_t threadId = 0;
HttpServer * pServer = &tsHttpServer; HttpServer * pServer = &tsHttpServer;
...@@ -175,7 +175,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -175,7 +175,7 @@ static void *httpAcceptHttpConnection(void *arg) {
while (1) { while (1) {
socklen_t addrlen = sizeof(clientAddr); socklen_t addrlen = sizeof(clientAddr);
connFd = (int32_t)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen); connFd = accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
if (pServer->stop) { if (pServer->stop) {
httpDebug("http server:%s socket stop, exiting...", pServer->label); httpDebug("http server:%s socket stop, exiting...", pServer->label);
break; break;
...@@ -227,7 +227,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -227,7 +227,7 @@ static void *httpAcceptHttpConnection(void *arg) {
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
pContext->ipstr, pThread->label, strerror(errno)); pContext->ipstr, pThread->label, strerror(errno));
taosClose(pContext->fd); taosCloseSocket(pContext->fd);
httpReleaseContext(pContext, true); httpReleaseContext(pContext, true);
continue; continue;
} }
...@@ -265,8 +265,8 @@ bool httpInitConnect() { ...@@ -265,8 +265,8 @@ bool httpInitConnect() {
return false; return false;
} }
pThread->pollFd = (int32_t)epoll_create(HTTP_MAX_EVENTS); // size does not matter pThread->pollFd = (EpollFd)epoll_create(HTTP_MAX_EVENTS); // size does not matter
if (pThread->pollFd < 0) { if (pThread->pollFd <= 0) {
httpError("http thread:%s, failed to create HTTP epoll", pThread->label); httpError("http thread:%s, failed to create HTTP epoll", pThread->label);
pthread_mutex_destroy(&(pThread->threadMutex)); pthread_mutex_destroy(&(pThread->threadMutex));
return false; return false;
......
...@@ -21,21 +21,14 @@ ...@@ -21,21 +21,14 @@
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcTcp.h" #include "rpcTcp.h"
#ifdef WINDOWS
#include "wepoll.h"
#endif
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
typedef struct SFdObj { typedef struct SFdObj {
void *signature; void *signature;
int32_t fd; // TCP socket FD SOCKET fd; // TCP socket FD
int closedByApp; // 1: already closed by App
void *thandle; // handle from upper layer, like TAOS void *thandle; // handle from upper layer, like TAOS
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int16_t closedByApp; // 1: already closed by App
struct SThreadObj *pThreadObj; struct SThreadObj *pThreadObj;
struct SFdObj *prev; struct SFdObj *prev;
struct SFdObj *next; struct SFdObj *next;
...@@ -47,7 +40,7 @@ typedef struct SThreadObj { ...@@ -47,7 +40,7 @@ typedef struct SThreadObj {
pthread_mutex_t mutex; pthread_mutex_t mutex;
uint32_t ip; uint32_t ip;
bool stop; bool stop;
int32_t pollFd; EpollFd pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
...@@ -56,7 +49,7 @@ typedef struct SThreadObj { ...@@ -56,7 +49,7 @@ typedef struct SThreadObj {
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
int32_t fd; SOCKET fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int8_t stop; int8_t stop;
...@@ -69,7 +62,7 @@ typedef struct { ...@@ -69,7 +62,7 @@ typedef struct {
} SServerObj; } SServerObj;
static void *taosProcessTcpData(void *param); static void *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int32_t fd); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
static void taosFreeFdObj(SFdObj *pFdObj); static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj);
static void *taosAcceptTcpConnection(void *arg); static void *taosAcceptTcpConnection(void *arg);
...@@ -134,7 +127,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -134,7 +127,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break; break;
} }
pThreadObj->pollFd = (int32_t)epoll_create(10); // size does not matter pThreadObj->pollFd = (EpollFd)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
code = -1; code = -1;
...@@ -227,7 +220,7 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -227,7 +220,7 @@ void taosCleanUpTcpServer(void *handle) {
} }
static void *taosAcceptTcpConnection(void *arg) { static void *taosAcceptTcpConnection(void *arg) {
int32_t connFd = -1; SOCKET connFd = -1;
struct sockaddr_in caddr; struct sockaddr_in caddr;
int threadId = 0; int threadId = 0;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
...@@ -238,7 +231,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -238,7 +231,7 @@ static void *taosAcceptTcpConnection(void *arg) {
while (1) { while (1) {
socklen_t addrlen = sizeof(caddr); socklen_t addrlen = sizeof(caddr);
connFd = (int32_t)accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
if (pServerObj->stop) { if (pServerObj->stop) {
tDebug("%s TCP server stop accepting new connections", pServerObj->label); tDebug("%s TCP server stop accepting new connections", pServerObj->label);
break; break;
...@@ -306,7 +299,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * ...@@ -306,7 +299,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
return NULL; return NULL;
} }
pThreadObj->pollFd = (int32_t)epoll_create(10); // size does not matter pThreadObj->pollFd = (EpollFd)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label); tError("%s failed to create TCP client epoll", label);
free(pThreadObj); free(pThreadObj);
...@@ -321,7 +314,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * ...@@ -321,7 +314,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
taosCloseSocket(pThreadObj->pollFd); EpollClose(pThreadObj->pollFd);
free(pThreadObj); free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
...@@ -351,8 +344,8 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -351,8 +344,8 @@ void taosCleanUpTcpClient(void *chandle) {
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SThreadObj * pThreadObj = shandle; SThreadObj * pThreadObj = shandle;
int32_t fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (fd < 0) return NULL; if (fd <= 0) return NULL;
struct sockaddr_in sin; struct sockaddr_in sin;
uint16_t localPort = 0; uint16_t localPort = 0;
...@@ -526,7 +519,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -526,7 +519,7 @@ static void *taosProcessTcpData(void *param) {
if (pThreadObj->stop) break; if (pThreadObj->stop) break;
} }
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); if (pThreadObj->pollFd >=0) EpollClose(pThreadObj->pollFd);
while (pThreadObj->pHead) { while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead; SFdObj *pFdObj = pThreadObj->pHead;
...@@ -541,7 +534,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -541,7 +534,7 @@ static void *taosProcessTcpData(void *param) {
return NULL; return NULL;
} }
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int32_t fd) { static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
struct epoll_event event; struct epoll_event event;
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1); SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
typedef struct { typedef struct {
int index; int index;
int32_t fd; SOCKET fd;
uint16_t port; // peer port uint16_t port; // peer port
uint16_t localPort; // local port uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet; char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
......
...@@ -82,8 +82,8 @@ typedef struct SsyncPeer { ...@@ -82,8 +82,8 @@ typedef struct SsyncPeer {
uint64_t sversion; // track the peer version in retrieve process uint64_t sversion; // track the peer version in retrieve process
uint64_t lastFileVer; // track the file version while retrieve uint64_t lastFileVer; // track the file version while retrieve
uint64_t lastWalVer; // track the wal version while retrieve uint64_t lastWalVer; // track the wal version while retrieve
int32_t syncFd; SOCKET syncFd;
int32_t peerFd; // forward FD SOCKET peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried int32_t numOfRetrieves; // number of retrieves tried
int32_t fileChanged; // a flag to indicate file is changed during retrieving process int32_t fileChanged; // a flag to indicate file is changed during retrieving process
int32_t refCount; int32_t refCount;
......
...@@ -27,12 +27,12 @@ typedef struct { ...@@ -27,12 +27,12 @@ typedef struct {
int32_t bufferSize; int32_t bufferSize;
void (*processBrokenLink)(int64_t handleId); void (*processBrokenLink)(int64_t handleId);
int32_t (*processIncomingMsg)(int64_t handleId, void *buffer); int32_t (*processIncomingMsg)(int64_t handleId, void *buffer);
void (*processIncomingConn)(int32_t fd, uint32_t ip); void (*processIncomingConn)(SOCKET fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
void *syncOpenTcpThreadPool(SPoolInfo *pInfo); void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void syncCloseTcpThreadPool(void *); void syncCloseTcpThreadPool(void *);
void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd); void *syncAllocateTcpConn(void *, int64_t rid, SOCKET connFd);
void syncFreeTcpConn(void *); void syncFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include "syncTcp.h" #include "syncTcp.h"
static void arbSignalHandler(int32_t signum, void *sigInfo, void *context); static void arbSignalHandler(int32_t signum, void *sigInfo, void *context);
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(int64_t rid); static void arbProcessBrokenLink(int64_t rid);
static int32_t arbProcessPeerMsg(int64_t rid, void *buffer); static int32_t arbProcessPeerMsg(int64_t rid, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
...@@ -36,7 +36,7 @@ static void * tsArbTcpPool; ...@@ -36,7 +36,7 @@ static void * tsArbTcpPool;
typedef struct { typedef struct {
char id[TSDB_EP_LEN + 24]; char id[TSDB_EP_LEN + 24];
int32_t nodeFd; SOCKET nodeFd;
void * pConn; void * pConn;
} SNodeConn; } SNodeConn;
...@@ -106,7 +106,7 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -106,7 +106,7 @@ int32_t main(int32_t argc, char *argv[]) {
return 0; return 0;
} }
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
char ipstr[24]; char ipstr[24];
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
......
...@@ -45,7 +45,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId); ...@@ -45,7 +45,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId);
static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
static void syncProcessBrokenLink(int64_t rid); static void syncProcessBrokenLink(int64_t rid);
static int32_t syncProcessPeerMsg(int64_t rid, void *buffer); static int32_t syncProcessPeerMsg(int64_t rid, void *buffer);
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer); static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode); static void syncAddArbitrator(SSyncNode *pNode);
static void syncFreeNode(void *); static void syncFreeNode(void *);
...@@ -1114,8 +1114,8 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1114,8 +1114,8 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
return; return;
} }
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); SOCKET connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if ((int32_t)connFd < 0) { if (connFd <= 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
return; return;
...@@ -1179,7 +1179,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { ...@@ -1179,7 +1179,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
} }
} }
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
char ipstr[24]; char ipstr[24];
int32_t i; int32_t i;
......
...@@ -24,18 +24,10 @@ ...@@ -24,18 +24,10 @@
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h" #include "syncTcp.h"
#ifdef WINDOWS
#include "wepoll.h"
#endif
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
bool stop; bool stop;
int32_t pollFd; SOCKET pollFd;
int32_t numOfFds; int32_t numOfFds;
struct SPoolObj *pPool; struct SPoolObj *pPool;
} SThreadObj; } SThreadObj;
...@@ -45,14 +37,14 @@ typedef struct SPoolObj { ...@@ -45,14 +37,14 @@ typedef struct SPoolObj {
SThreadObj **pThread; SThreadObj **pThread;
pthread_t thread; pthread_t thread;
int32_t nextId; int32_t nextId;
int32_t acceptFd; // FD for accept new connection SOCKET acceptFd; // FD for accept new connection
int8_t stop; int8_t stop;
} SPoolObj; } SPoolObj;
typedef struct { typedef struct {
SThreadObj *pThread; SThreadObj *pThread;
int64_t handleId; int64_t handleId;
int32_t fd; SOCKET fd;
int32_t closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
...@@ -128,7 +120,7 @@ void syncCloseTcpThreadPool(void *param) { ...@@ -128,7 +120,7 @@ void syncCloseTcpThreadPool(void *param) {
tfree(pPool); tfree(pPool);
} }
void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) { void *syncAllocateTcpConn(void *param, int64_t rid, SOCKET connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = param; SPoolObj *pPool = param;
...@@ -249,7 +241,7 @@ static void *syncProcessTcpData(void *param) { ...@@ -249,7 +241,7 @@ static void *syncProcessTcpData(void *param) {
sDebug("%p TCP epoll thread exits", pThread); sDebug("%p TCP epoll thread exits", pThread);
taosCloseSocket(pThread->pollFd); EpollClose(pThread->pollFd);
tfree(pThread); tfree(pThread);
tfree(buffer); tfree(buffer);
return NULL; return NULL;
...@@ -264,13 +256,13 @@ static void *syncAcceptPeerTcpConnection(void *argv) { ...@@ -264,13 +256,13 @@ static void *syncAcceptPeerTcpConnection(void *argv) {
while (1) { while (1) {
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
socklen_t addrlen = sizeof(clientAddr); socklen_t addrlen = sizeof(clientAddr);
int32_t connFd = (int32_t)accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); SOCKET connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen);
if (pPool->stop) { if (pPool->stop) {
sDebug("%p TCP server accept is stopped", pPool); sDebug("%p TCP server accept is stopped", pPool);
break; break;
} }
if ((int32_t)connFd < 0) { if (connFd < 0) {
if (errno == EINVAL) { if (errno == EINVAL) {
sDebug("%p TCP server accept is exiting...", pPool); sDebug("%p TCP server accept is exiting...", pPool);
break; break;
...@@ -298,7 +290,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { ...@@ -298,7 +290,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
if (pThread == NULL) return NULL; if (pThread == NULL) return NULL;
pThread->pPool = pPool; pThread->pPool = pPool;
pThread->pollFd = (int32_t)epoll_create(10); // size does not matter pThread->pollFd = (EpollFd)epoll_create(10); // size does not matter
if (pThread->pollFd < 0) { if (pThread->pollFd < 0) {
tfree(pThread); tfree(pThread);
return NULL; return NULL;
......
...@@ -24,21 +24,17 @@ extern "C" { ...@@ -24,21 +24,17 @@ extern "C" {
#include "wepoll.h" #include "wepoll.h"
#endif #endif
#ifndef EPOLLWAKEUP int32_t taosReadn(SOCKET sock, char *buffer, int32_t len);
#define EPOLLWAKEUP (1u << 29) int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes);
#endif int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes);
int32_t taosReadn(int32_t sock, char *buffer, int32_t len); int32_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len);
int32_t taosWriteMsg(int32_t fd, void *ptr, int32_t nbytes); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
int32_t taosReadMsg(int32_t fd, void *ptr, int32_t nbytes);
int32_t taosNonblockwrite(int32_t fd, char *ptr, int32_t nbytes); SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
int32_t taosCopyFds(int32_t sfd, int32_t dfd, int64_t len); SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
int32_t taosSetNonblocking(int32_t sock, int32_t on); SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(SOCKET sockFd);
int32_t taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
int32_t taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
int32_t taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(int32_t sockFd);
int32_t taosGetFqdn(char *); int32_t taosGetFqdn(char *);
uint32_t taosGetIpv4FromFqdn(const char *); uint32_t taosGetIpv4FromFqdn(const char *);
......
...@@ -39,7 +39,7 @@ typedef struct { ...@@ -39,7 +39,7 @@ typedef struct {
static void *taosNetBindUdpPort(void *sarg) { static void *taosNetBindUdpPort(void *sarg) {
STestInfo *pinfo = (STestInfo *)sarg; STestInfo *pinfo = (STestInfo *)sarg;
int32_t port = pinfo->port; int32_t port = pinfo->port;
int32_t serverSocket; SOCKET serverSocket;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
int32_t iDataNum; int32_t iDataNum;
socklen_t sin_size; socklen_t sin_size;
...@@ -48,7 +48,7 @@ static void *taosNetBindUdpPort(void *sarg) { ...@@ -48,7 +48,7 @@ static void *taosNetBindUdpPort(void *sarg) {
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
if ((serverSocket = (int32_t)socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create UDP socket since %s", strerror(errno)); uError("failed to create UDP socket since %s", strerror(errno));
return NULL; return NULL;
} }
...@@ -106,12 +106,12 @@ static void *taosNetBindTcpPort(void *sarg) { ...@@ -106,12 +106,12 @@ static void *taosNetBindTcpPort(void *sarg) {
STestInfo *pinfo = sarg; STestInfo *pinfo = sarg;
int32_t port = pinfo->port; int32_t port = pinfo->port;
int32_t serverSocket; SOCKET serverSocket;
int32_t addr_len = sizeof(clientAddr); int32_t addr_len = sizeof(clientAddr);
int32_t client; SOCKET client;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
if ((serverSocket = (int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
uError("failed to create TCP socket since %s", strerror(errno)); uError("failed to create TCP socket since %s", strerror(errno));
return NULL; return NULL;
} }
...@@ -133,7 +133,6 @@ static void *taosNetBindTcpPort(void *sarg) { ...@@ -133,7 +133,6 @@ static void *taosNetBindTcpPort(void *sarg) {
return NULL; return NULL;
} }
if (taosKeepTcpAlive(serverSocket) < 0) { if (taosKeepTcpAlive(serverSocket) < 0) {
uError("failed to set tcp server keep-alive option since %s", strerror(errno)); uError("failed to set tcp server keep-alive option since %s", strerror(errno));
taosCloseSocket(serverSocket); taosCloseSocket(serverSocket);
...@@ -148,7 +147,7 @@ static void *taosNetBindTcpPort(void *sarg) { ...@@ -148,7 +147,7 @@ static void *taosNetBindTcpPort(void *sarg) {
uInfo("TCP server at port:%d is listening", port); uInfo("TCP server at port:%d is listening", port);
while (1) { while (1) {
client = (int32_t)accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) { if (client < 0) {
uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno)); uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno));
continue; continue;
...@@ -178,10 +177,10 @@ static void *taosNetBindTcpPort(void *sarg) { ...@@ -178,10 +177,10 @@ static void *taosNetBindTcpPort(void *sarg) {
} }
static int32_t taosNetCheckTcpPort(STestInfo *info) { static int32_t taosNetCheckTcpPort(STestInfo *info) {
int32_t clientSocket; SOCKET clientSocket;
char buffer[BUFFER_SIZE] = {0}; char buffer[BUFFER_SIZE] = {0};
if ((clientSocket = (int32_t)socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
uError("failed to create TCP client socket since %s", strerror(errno)); uError("failed to create TCP client socket since %s", strerror(errno));
return -1; return -1;
} }
...@@ -226,14 +225,14 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) { ...@@ -226,14 +225,14 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) {
} }
static int32_t taosNetCheckUdpPort(STestInfo *info) { static int32_t taosNetCheckUdpPort(STestInfo *info) {
int32_t clientSocket; SOCKET clientSocket;
char buffer[BUFFER_SIZE] = {0}; char buffer[BUFFER_SIZE] = {0};
int32_t iDataNum = 0; int32_t iDataNum = 0;
int32_t bufSize = 1024000; int32_t bufSize = 1024000;
struct sockaddr_in serverAddr; struct sockaddr_in serverAddr;
if ((clientSocket = (int32_t)socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create udp client socket since %s", strerror(errno)); uError("failed to create udp client socket since %s", strerror(errno));
return -1; return -1;
} }
......
...@@ -102,7 +102,7 @@ uint32_t ip2uint(const char *const ip_addr) { ...@@ -102,7 +102,7 @@ uint32_t ip2uint(const char *const ip_addr) {
return *((uint32_t *)ip); return *((uint32_t *)ip);
} }
int32_t taosWriteMsg(int32_t fd, void *buf, int32_t nbytes) { int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t nleft, nwritten; int32_t nleft, nwritten;
char * ptr = (char *)buf; char * ptr = (char *)buf;
...@@ -128,7 +128,7 @@ int32_t taosWriteMsg(int32_t fd, void *buf, int32_t nbytes) { ...@@ -128,7 +128,7 @@ int32_t taosWriteMsg(int32_t fd, void *buf, int32_t nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int32_t taosReadMsg(int32_t fd, void *buf, int32_t nbytes) { int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t nleft, nread; int32_t nleft, nread;
char * ptr = (char *)buf; char * ptr = (char *)buf;
...@@ -159,7 +159,7 @@ int32_t taosReadMsg(int32_t fd, void *buf, int32_t nbytes) { ...@@ -159,7 +159,7 @@ int32_t taosReadMsg(int32_t fd, void *buf, int32_t nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int32_t taosNonblockwrite(int32_t fd, char *ptr, int32_t nbytes) { int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
taosSetNonblocking(fd, 1); taosSetNonblocking(fd, 1);
int32_t nleft, nwritten, nready; int32_t nleft, nwritten, nready;
...@@ -201,7 +201,7 @@ int32_t taosNonblockwrite(int32_t fd, char *ptr, int32_t nbytes) { ...@@ -201,7 +201,7 @@ int32_t taosNonblockwrite(int32_t fd, char *ptr, int32_t nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int32_t taosReadn(int32_t fd, char *ptr, int32_t nbytes) { int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
int32_t nread, nready, nleft = nbytes; int32_t nread, nready, nleft = nbytes;
fd_set fset; fd_set fset;
...@@ -239,9 +239,9 @@ int32_t taosReadn(int32_t fd, char *ptr, int32_t nbytes) { ...@@ -239,9 +239,9 @@ int32_t taosReadn(int32_t fd, char *ptr, int32_t nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) { SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr; struct sockaddr_in localAddr;
int32_t sockFd; SOCKET sockFd;
int32_t bufSize = 1024000; int32_t bufSize = 1024000;
uDebug("open udp socket:0x%x:%hu", ip, port); uDebug("open udp socket:0x%x:%hu", ip, port);
...@@ -251,7 +251,7 @@ int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) { ...@@ -251,7 +251,7 @@ int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) {
localAddr.sin_addr.s_addr = ip; localAddr.sin_addr.s_addr = ip;
localAddr.sin_port = (uint16_t)htons(port); localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = (int32_t)socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
uError("failed to open udp socket: %d (%s)", errno, strerror(errno)); uError("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd); taosCloseSocketNoCheck(sockFd);
return -1; return -1;
...@@ -279,13 +279,13 @@ int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) { ...@@ -279,13 +279,13 @@ int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) {
return sockFd; return sockFd;
} }
int32_t taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
int32_t sockFd = 0; SOCKET sockFd = 0;
int32_t ret; int32_t ret;
struct sockaddr_in serverAddr, clientAddr; struct sockaddr_in serverAddr, clientAddr;
int32_t bufSize = 1024 * 1024; int32_t bufSize = 1024 * 1024;
sockFd = (int32_t)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd <= 2) { if (sockFd <= 2) {
uError("failed to open the socket: %d (%s)", errno, strerror(errno)); uError("failed to open the socket: %d (%s)", errno, strerror(errno));
...@@ -346,7 +346,7 @@ int32_t taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t cli ...@@ -346,7 +346,7 @@ int32_t taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t cli
return sockFd; return sockFd;
} }
int32_t taosKeepTcpAlive(int32_t sockFd) { int32_t taosKeepTcpAlive(SOCKET sockFd) {
int32_t alive = 1; int32_t alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) { if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno)); uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
...@@ -394,9 +394,9 @@ int32_t taosKeepTcpAlive(int32_t sockFd) { ...@@ -394,9 +394,9 @@ int32_t taosKeepTcpAlive(int32_t sockFd) {
return 0; return 0;
} }
int32_t taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd; struct sockaddr_in serverAdd;
int32_t sockFd; SOCKET sockFd;
int32_t reuse; int32_t reuse;
uDebug("open tcp server socket:0x%x:%hu", ip, port); uDebug("open tcp server socket:0x%x:%hu", ip, port);
...@@ -406,7 +406,7 @@ int32_t taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { ...@@ -406,7 +406,7 @@ int32_t taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
serverAdd.sin_addr.s_addr = ip; serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port); serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = (int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
uError("failed to open TCP socket: %d (%s)", errno, strerror(errno)); uError("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd); taosCloseSocketNoCheck(sockFd);
return -1; return -1;
...@@ -449,7 +449,7 @@ void tinet_ntoa(char *ipstr, uint32_t ip) { ...@@ -449,7 +449,7 @@ void tinet_ntoa(char *ipstr, uint32_t ip) {
#define COPY_SIZE 32768 #define COPY_SIZE 32768
// sendfile shall be used // sendfile shall be used
int32_t taosCopyFds(int32_t sfd, int32_t dfd, int64_t len) { int32_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
int64_t leftLen; int64_t leftLen;
int32_t readLen, writeLen; int32_t readLen, writeLen;
char temp[COPY_SIZE]; char temp[COPY_SIZE];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册