未验证 提交 007997af 编写于 作者: S slguan 提交者: GitHub

Merge pull request #749 from taosdata/feature/lihui

[TBASE-1198]
...@@ -24,9 +24,9 @@ void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, ...@@ -24,9 +24,9 @@ void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl,
void taosCloseConnCache(void *handle); void taosCloseConnCache(void *handle);
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, short port, char *user); void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void *taosGetConnFromCache(void *handle, uint32_t ip, short port, char *user); void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -226,7 +226,7 @@ void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex); ...@@ -226,7 +226,7 @@ void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(SSqlCmd* pCmd, uint64_t uid); int16_t tscGetJoinTagColIndexByUid(SSqlCmd* pCmd, uint64_t uid);
TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, int port, void (*fp)(void*, TAOS_RES*, int), TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void (*fp)(void*, TAOS_RES*, int),
void* param, void** taos); void* param, void** taos);
void sortRemoveDuplicates(STableDataBlocks* dataBuf); void sortRemoveDuplicates(STableDataBlocks* dataBuf);
......
...@@ -342,7 +342,7 @@ typedef struct _tsc_obj { ...@@ -342,7 +342,7 @@ typedef struct _tsc_obj {
void * signature; void * signature;
void * pTimer; void * pTimer;
char mgmtIp[TSDB_USER_LEN]; char mgmtIp[TSDB_USER_LEN];
short mgmtPort; uint16_t mgmtPort;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
char acctId[TSDB_DB_NAME_LEN]; char acctId[TSDB_DB_NAME_LEN];
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
typedef struct _c_hash_t { typedef struct _c_hash_t {
uint32_t ip; uint32_t ip;
short port; uint16_t port;
struct _c_hash_t *prev; struct _c_hash_t *prev;
struct _c_hash_t *next; struct _c_hash_t *next;
void * data; void * data;
...@@ -45,14 +45,14 @@ typedef struct { ...@@ -45,14 +45,14 @@ typedef struct {
void *pTimer; void *pTimer;
} SConnCache; } SConnCache;
int taosHashConn(void *handle, uint32_t ip, short port, char *user) { int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
SConnCache *pObj = (SConnCache *)handle; SConnCache *pObj = (SConnCache *)handle;
int hash = 0; int hash = 0;
// size_t user_len = strlen(user); // size_t user_len = strlen(user);
hash = ip >> 16; hash = ip >> 16;
hash += (unsigned short)(ip & 0xFFFF); hash += (unsigned short)(ip & 0xFFFF);
hash += (unsigned short)port; hash += port;
while (*user != '\0') { while (*user != '\0') {
hash += *user; hash += *user;
user++; user++;
...@@ -74,7 +74,7 @@ void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64 ...@@ -74,7 +74,7 @@ void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64
pNext = pNode->next; pNext = pNode->next;
pObj->total--; pObj->total--;
pObj->count[hash]--; pObj->count[hash]--;
tscTrace("%p ip:0x%x:%d:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pObj->count[hash]); pObj->count[hash]);
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pNode = pNext; pNode = pNext;
...@@ -86,7 +86,7 @@ void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64 ...@@ -86,7 +86,7 @@ void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64
pObj->connHashList[hash] = NULL; pObj->connHashList[hash] = NULL;
} }
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, short port, char *user) { void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
int hash; int hash;
SConnHash * pNode; SConnHash * pNode;
SConnCache *pObj; SConnCache *pObj;
...@@ -125,7 +125,7 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, short port, ch ...@@ -125,7 +125,7 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, short port, ch
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
tscTrace("%p ip:0x%x:%d:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]); tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]);
return pObj; return pObj;
} }
...@@ -152,7 +152,7 @@ void taosCleanConnCache(void *handle, void *tmrId) { ...@@ -152,7 +152,7 @@ void taosCleanConnCache(void *handle, void *tmrId) {
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
} }
void *taosGetConnFromCache(void *handle, uint32_t ip, short port, char *user) { void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) {
int hash; int hash;
SConnHash * pNode; SConnHash * pNode;
SConnCache *pObj; SConnCache *pObj;
...@@ -201,7 +201,7 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, short port, char *user) { ...@@ -201,7 +201,7 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, short port, char *user) {
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
if (pData) { if (pData) {
tscTrace("%p ip:0x%x:%d:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]); tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]);
} }
return pData; return pData;
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
void tscSaveSlowQueryFp(void *handle, void *tmrId); void tscSaveSlowQueryFp(void *handle, void *tmrId);
void *tscSlowQueryConn = NULL; void *tscSlowQueryConn = NULL;
bool tscSlowQueryConnInitialized = false; bool tscSlowQueryConnInitialized = false;
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, int port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos); void *param, void **taos);
void tscInitConnCb(void *param, TAOS_RES *result, int code) { void tscInitConnCb(void *param, TAOS_RES *result, int code) {
......
...@@ -2669,7 +2669,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -2669,7 +2669,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
int32_t port = strtol(portStr, NULL, 10); uint16_t port = (uint16_t)strtol(portStr, NULL, 10);
if (port <= 0 || port > 65535) { if (port <= 0 || port > 65535) {
memset(pCmd->payload, 0, tListLen(pCmd->payload)); memset(pCmd->payload, 0, tListLen(pCmd->payload));
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, int port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) { void *param, void **taos) {
STscObj *pObj; STscObj *pObj;
...@@ -153,7 +153,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -153,7 +153,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
return pObj; return pObj;
} }
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, int port) { TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) { if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) {
#ifdef CLUSTER #ifdef CLUSTER
ip = tsPrivateIp; ip = tsPrivateIp;
...@@ -205,7 +205,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -205,7 +205,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
return taos; return taos;
} }
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, int port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) { void *param, void **taos) {
#ifndef CLUSTER #ifndef CLUSTER
if (ip == NULL) { if (ip == NULL) {
......
...@@ -23,8 +23,8 @@ extern "C" { ...@@ -23,8 +23,8 @@ extern "C" {
#include "taosmsg.h" #include "taosmsg.h"
#include "tsdb.h" #include "tsdb.h"
extern short tsMgmtMgmtPort; extern uint16_t tsMgmtMgmtPort;
extern short tsMgmtSyncPort; extern uint16_t tsMgmtSyncPort;
extern int sdbMaxNodes; extern int sdbMaxNodes;
extern int tsMgmtPeerHBTimer; // seconds extern int tsMgmtPeerHBTimer; // seconds
extern char sdbZone[]; extern char sdbZone[];
......
...@@ -59,7 +59,7 @@ typedef struct taosField { ...@@ -59,7 +59,7 @@ typedef struct taosField {
void taos_init(); void taos_init();
int taos_options(TSDB_OPTION option, const void *arg, ...); int taos_options(TSDB_OPTION option, const void *arg, ...);
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, int port); TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
void taos_close(TAOS *taos); void taos_close(TAOS *taos);
typedef struct TAOS_BIND { typedef struct TAOS_BIND {
......
...@@ -222,7 +222,7 @@ typedef struct { ...@@ -222,7 +222,7 @@ typedef struct {
// internal part // internal part
uint32_t destId; uint32_t destId;
char meterId[TSDB_UNI_LEN]; char meterId[TSDB_UNI_LEN];
short port; // for UDP only uint16_t port; // for UDP only
char empty[1]; char empty[1];
char msgType; char msgType;
int32_t msgLen; int32_t msgLen;
......
...@@ -57,12 +57,12 @@ extern char scriptDir[]; ...@@ -57,12 +57,12 @@ extern char scriptDir[];
extern char tsMasterIp[]; extern char tsMasterIp[];
extern char tsSecondIp[]; extern char tsSecondIp[];
extern short tsMgmtVnodePort; extern uint16_t tsMgmtVnodePort;
extern short tsMgmtShellPort; extern uint16_t tsMgmtShellPort;
extern short tsVnodeShellPort; extern uint16_t tsVnodeShellPort;
extern short tsVnodeVnodePort; extern uint16_t tsVnodeVnodePort;
extern short tsMgmtMgmtPort; extern uint16_t tsMgmtMgmtPort;
extern short tsMgmtSyncPort; extern uint16_t tsMgmtSyncPort;
extern int tsStatusInterval; extern int tsStatusInterval;
extern int tsShellActivityTimer; extern int tsShellActivityTimer;
...@@ -141,7 +141,7 @@ extern int tsProjectExecInterval; ...@@ -141,7 +141,7 @@ extern int tsProjectExecInterval;
extern int64_t tsMaxRetentWindow; extern int64_t tsMaxRetentWindow;
extern char tsHttpIp[]; extern char tsHttpIp[];
extern short tsHttpPort; extern uint16_t tsHttpPort;
extern int tsHttpCacheSessions; extern int tsHttpCacheSessions;
extern int tsHttpSessionExpire; extern int tsHttpSessionExpire;
extern int tsHttpMaxThreads; extern int tsHttpMaxThreads;
......
...@@ -47,7 +47,7 @@ extern "C" { ...@@ -47,7 +47,7 @@ extern "C" {
typedef struct { typedef struct {
char *localIp; // local IP used char *localIp; // local IP used
short localPort; // local port uint16_t localPort; // local port
char *label; // for debug purpose char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
void *(*fp)(char *, void *, void *); // function to process the incoming msg void *(*fp)(char *, void *, void *); // function to process the incoming msg
...@@ -72,7 +72,7 @@ typedef struct { ...@@ -72,7 +72,7 @@ typedef struct {
void * shandle; // pointer returned by taosOpenRpc void * shandle; // pointer returned by taosOpenRpc
void * ahandle; // handle provided by app void * ahandle; // handle provided by app
char * peerIp; // peer IP string char * peerIp; // peer IP string
short peerPort; // peer port uint16_t peerPort; // peer port
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encrypt algorithm char encrypt; // encrypt algorithm
char * secret; // key for authentication char * secret; // key for authentication
...@@ -107,7 +107,7 @@ int taosSendSimpleRsp(void *thandle, char rsptype, char code); ...@@ -107,7 +107,7 @@ int taosSendSimpleRsp(void *thandle, char rsptype, char code);
int taosSetSecurityInfo(int cid, int sid, char *id, int spi, int encrypt, char *secret, char *ckey); int taosSetSecurityInfo(int cid, int sid, char *id, int spi, int encrypt, char *secret, char *ckey);
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, short *peerPort, int *cid, int *sid); void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid);
int taosGetOutType(void *thandle); int taosGetOutType(void *thandle);
......
...@@ -33,19 +33,19 @@ int taosWriteMsg(int fd, void *ptr, int nbytes); ...@@ -33,19 +33,19 @@ int taosWriteMsg(int fd, void *ptr, int nbytes);
int taosReadMsg(int fd, void *ptr, int nbytes); int taosReadMsg(int fd, void *ptr, int nbytes);
int taosOpenUdpSocket(char *ip, short port); int taosOpenUdpSocket(char *ip, uint16_t port);
int taosOpenTcpClientSocket(char *ip, short port, char *localIp); int taosOpenTcpClientSocket(char *ip, uint16_t port, char *localIp);
int taosOpenTcpServerSocket(char *ip, short port); int taosOpenTcpServerSocket(char *ip, uint16_t port);
int taosKeepTcpAlive(int sockFd); int taosKeepTcpAlive(int sockFd);
void taosCloseTcpSocket(int sockFd); void taosCloseTcpSocket(int sockFd);
int taosOpenUDServerSocket(char *ip, short port); int taosOpenUDServerSocket(char *ip, uint16_t port);
int taosOpenUDClientSocket(char *ip, short port); int taosOpenUDClientSocket(char *ip, uint16_t port);
int taosOpenRawSocket(char *ip); int taosOpenRawSocket(char *ip);
......
...@@ -67,7 +67,7 @@ static struct argp_option options[] = { ...@@ -67,7 +67,7 @@ static struct argp_option options[] = {
/* Used by main to communicate with parse_opt. */ /* Used by main to communicate with parse_opt. */
struct arguments { struct arguments {
char *host; char *host;
int port; uint16_t port;
char *user; char *user;
char *password; char *password;
char *database; char *database;
...@@ -310,7 +310,7 @@ int main(int argc, char *argv[]) { ...@@ -310,7 +310,7 @@ int main(int argc, char *argv[]) {
enum MODE query_mode = arguments.mode; enum MODE query_mode = arguments.mode;
char *ip_addr = arguments.host; char *ip_addr = arguments.host;
int port = arguments.port; uint16_t port = arguments.port;
char *user = arguments.user; char *user = arguments.user;
char *pass = arguments.password; char *pass = arguments.password;
char *db_name = arguments.database; char *db_name = arguments.database;
...@@ -343,7 +343,7 @@ int main(int argc, char *argv[]) { ...@@ -343,7 +343,7 @@ int main(int argc, char *argv[]) {
struct tm tm = *localtime(&tTime); struct tm tm = *localtime(&tTime);
fprintf(fp, "###################################################################\n"); fprintf(fp, "###################################################################\n");
fprintf(fp, "# Server IP: %s:%d\n", ip_addr == NULL ? "localhost" : ip_addr, port); fprintf(fp, "# Server IP: %s:%hu\n", ip_addr == NULL ? "localhost" : ip_addr, port);
fprintf(fp, "# User: %s\n", user); fprintf(fp, "# User: %s\n", user);
fprintf(fp, "# Password: %s\n", pass); fprintf(fp, "# Password: %s\n", pass);
fprintf(fp, "# Use metric: %s\n", use_metric ? "true" : "false"); fprintf(fp, "# Use metric: %s\n", use_metric ? "true" : "false");
......
...@@ -172,7 +172,7 @@ struct arguments { ...@@ -172,7 +172,7 @@ struct arguments {
char *host; char *host;
char *user; char *user;
char *password; char *password;
int port; uint16_t port;
// output file // output file
char output[TSDB_FILENAME_LEN + 1]; char output[TSDB_FILENAME_LEN + 1];
char input[TSDB_FILENAME_LEN + 1]; char input[TSDB_FILENAME_LEN + 1];
......
...@@ -210,7 +210,7 @@ typedef struct HttpThread { ...@@ -210,7 +210,7 @@ typedef struct HttpThread {
typedef struct _http_server_obj_ { typedef struct _http_server_obj_ {
char label[HTTP_LABEL_SIZE]; char label[HTTP_LABEL_SIZE];
char serverIp[16]; char serverIp[16];
short serverPort; uint16_t serverPort;
int cacheContext; int cacheContext;
int sessionExpire; int sessionExpire;
int numOfThreads; int numOfThreads;
...@@ -233,7 +233,7 @@ bool httpCheckUsedbSql(char *sql); ...@@ -233,7 +233,7 @@ bool httpCheckUsedbSql(char *sql);
void httpTimeToString(time_t t, char *buf, int buflen); void httpTimeToString(time_t t, char *buf, int buflen);
// http init method // http init method
void *httpInitServer(char *ip, short port, char *label, int numOfThreads, void *fp, void *shandle); void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer); void httpCleanUpServer(HttpServer *pServer);
// http server connection // http server connection
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "taos.h" #include "taos.h"
#include "tsclient.h" #include "tsclient.h"
void *taos_connect_a(char *ip, char *user, char *pass, char *db, int port, void (*fp)(void *, TAOS_RES *, int), void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos); void *param, void **taos);
void httpProcessMultiSql(HttpContext *pContext); void httpProcessMultiSql(HttpContext *pContext);
void taosNotePrint(const char * const format, ...); void taosNotePrint(const char * const format, ...);
......
...@@ -61,7 +61,7 @@ typedef struct { ...@@ -61,7 +61,7 @@ typedef struct {
MonitorConn *monitor = NULL; MonitorConn *monitor = NULL;
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, int port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos); void *param, void **taos);
void monitorInitConn(void *para, void *unused); void monitorInitConn(void *para, void *unused);
void monitorInitConnCb(void *param, TAOS_RES *result, int code); void monitorInitConnCb(void *param, TAOS_RES *result, int code);
......
...@@ -170,12 +170,12 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle ...@@ -170,12 +170,12 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
} }
int taosOpenUDClientSocket(char *ip, short port) { int taosOpenUDClientSocket(char *ip, uint16_t port) {
int sockFd = 0; int sockFd = 0;
struct sockaddr_un serverAddr; struct sockaddr_un serverAddr;
int ret; int ret;
char name[128]; char name[128];
sprintf(name, "%s.%d", ip, port); sprintf(name, "%s.%hu", ip, port);
sockFd = socket(AF_UNIX, SOCK_STREAM, 0); sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
...@@ -198,13 +198,13 @@ int taosOpenUDClientSocket(char *ip, short port) { ...@@ -198,13 +198,13 @@ int taosOpenUDClientSocket(char *ip, short port) {
return sockFd; return sockFd;
} }
int taosOpenUDServerSocket(char *ip, short port) { int taosOpenUDServerSocket(char *ip, uint16_t port) {
struct sockaddr_un serverAdd; struct sockaddr_un serverAdd;
int sockFd; int sockFd;
char name[128]; char name[128];
pTrace("open ud socket:%s", name); pTrace("open ud socket:%s", name);
sprintf(name, "%s.%d", ip, port); sprintf(name, "%s.%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd)); bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sun_family = AF_UNIX; serverAdd.sun_family = AF_UNIX;
...@@ -295,7 +295,7 @@ void taosGetSystemInfo() { ...@@ -295,7 +295,7 @@ void taosGetSystemInfo() {
taosGetSystemLocale(); taosGetSystemLocale();
} }
void *taosInitTcpClient(char *ip, short port, char *flabel, int num, void *fp, void *shandle) { void *taosInitTcpClient(char *ip, uint16_t port, char *flabel, int num, void *fp, void *shandle) {
tError("function taosInitTcpClient is not implemented in darwin system, exit!"); tError("function taosInitTcpClient is not implemented in darwin system, exit!");
exit(0); exit(0);
} }
...@@ -305,12 +305,12 @@ void taosCloseTcpClientConnection(void *chandle) { ...@@ -305,12 +305,12 @@ void taosCloseTcpClientConnection(void *chandle) {
exit(0); exit(0);
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, short port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
tError("function taosOpenTcpClientConnection is not implemented in darwin system, exit!"); tError("function taosOpenTcpClientConnection is not implemented in darwin system, exit!");
exit(0); exit(0);
} }
int taosSendTcpClientData(unsigned int ip, short port, char *data, int len, void *chandle) { int taosSendTcpClientData(unsigned int ip, uint16_t port, char *data, int len, void *chandle) {
tError("function taosSendTcpClientData is not implemented in darwin system, exit!"); tError("function taosSendTcpClientData is not implemented in darwin system, exit!");
exit(0); exit(0);
} }
...@@ -330,12 +330,12 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -330,12 +330,12 @@ void taosCleanUpTcpServer(void *handle) {
exit(0); exit(0);
} }
void *taosInitTcpServer(char *ip, short port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
tError("function taosInitTcpServer is not implemented in darwin system, exit!"); tError("function taosInitTcpServer is not implemented in darwin system, exit!");
exit(0); exit(0);
} }
int taosSendTcpServerData(unsigned int ip, short port, char *data, int len, void *chandle) { int taosSendTcpServerData(unsigned int ip, uint16_t port, char *data, int len, void *chandle) {
tError("function taosSendTcpServerData is not implemented in darwin system, exit!"); tError("function taosSendTcpServerData is not implemented in darwin system, exit!");
exit(0); exit(0);
} }
......
...@@ -163,12 +163,12 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle ...@@ -163,12 +163,12 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
} }
int taosOpenUDClientSocket(char *ip, short port) { int taosOpenUDClientSocket(char *ip, uint16_t port) {
int sockFd = 0; int sockFd = 0;
struct sockaddr_un serverAddr; struct sockaddr_un serverAddr;
int ret; int ret;
char name[128]; char name[128];
sprintf(name, "%s.%d", ip, port); sprintf(name, "%s.%hu", ip, port);
sockFd = socket(AF_UNIX, SOCK_STREAM, 0); sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
...@@ -191,13 +191,13 @@ int taosOpenUDClientSocket(char *ip, short port) { ...@@ -191,13 +191,13 @@ int taosOpenUDClientSocket(char *ip, short port) {
return sockFd; return sockFd;
} }
int taosOpenUDServerSocket(char *ip, short port) { int taosOpenUDServerSocket(char *ip, uint16_t port) {
struct sockaddr_un serverAdd; struct sockaddr_un serverAdd;
int sockFd; int sockFd;
char name[128]; char name[128];
pTrace("open ud socket:%s", name); pTrace("open ud socket:%s", name);
sprintf(name, "%s.%d", ip, port); sprintf(name, "%s.%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd)); bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sun_family = AF_UNIX; serverAdd.sun_family = AF_UNIX;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "tlog.h" #include "tlog.h"
void *taosInitTcpClient(char *ip, short port, char *label, int num, void *fp, void *shandle) { void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
tError("InitTcpClient not support in windows"); tError("InitTcpClient not support in windows");
return 0; return 0;
} }
...@@ -24,12 +24,12 @@ void taosCloseTcpClientConnection(void *chandle) { ...@@ -24,12 +24,12 @@ void taosCloseTcpClientConnection(void *chandle) {
tError("CloseTcpClientConnection not support in windows"); tError("CloseTcpClientConnection not support in windows");
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, short port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
tError("OpenTcpClientConnection not support in windows"); tError("OpenTcpClientConnection not support in windows");
return 0; return 0;
} }
int taosSendTcpClientData(unsigned int ip, short port, char *data, int len, void *chandle) { int taosSendTcpClientData(unsigned int ip, uint16_t port, char *data, int len, void *chandle) {
tError("SendTcpClientData not support in windows"); tError("SendTcpClientData not support in windows");
return 0; return 0;
} }
......
...@@ -23,12 +23,12 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -23,12 +23,12 @@ void taosCleanUpTcpServer(void *handle) {
tError("CleanUpTcpServer not support in windows"); tError("CleanUpTcpServer not support in windows");
} }
void *taosInitTcpServer(char *ip, short port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
tError("InitTcpServer not support in windows"); tError("InitTcpServer not support in windows");
return 0; return 0;
} }
int taosSendTcpServerData(unsigned int ip, short port, char *data, int len, void *chandle) { int taosSendTcpServerData(unsigned int ip, uint16_t port, char *data, int len, void *chandle) {
tError("SendTcpServerData not support in windows"); tError("SendTcpServerData not support in windows");
return 0; return 0;
} }
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
void *taosOpenIpHash(int maxSessions); void *taosOpenIpHash(int maxSessions);
void taosCloseIpHash(void *handle); void taosCloseIpHash(void *handle);
void *taosAddIpHash(void *handle, void *pData, uint32_t ip, short port); void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port);
void taosDeleteIpHash(void *handle, uint32_t ip, short port); void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port);
void *taosGetIpHash(void *handle, uint32_t ip, short port); void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port);
#endif #endif
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
#include "tsdb.h" #include "tsdb.h"
void *taosInitTcpClient(char *ip, short port, char *label, int num, void *fp, void *shandle); void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle);
void taosCleanUpTcpClient(void *chandle); void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, short port); void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosCloseTcpClientConnection(void *chandle); void taosCloseTcpClientConnection(void *chandle);
int taosSendTcpClientData(uint32_t ip, short port, char *data, int len, void *chandle); int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
#endif #endif
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
#include "tsdb.h" #include "tsdb.h"
void *taosInitTcpServer(char *ip, short port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param); void taosCleanUpTcpServer(void *param);
void taosCloseTcpServerConnection(void *param); void taosCloseTcpServerConnection(void *param);
int taosSendTcpServerData(uint32_t ip, short port, char *data, int len, void *chandle); int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
#endif #endif
...@@ -18,11 +18,11 @@ ...@@ -18,11 +18,11 @@
#include "tsdb.h" #include "tsdb.h"
void *taosInitUdpServer(char *ip, short port, char *label, int, void *fp, void *shandle); void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void *taosInitUdpClient(char *ip, short port, char *label, int, void *fp, void *shandle); void *taosInitUdpClient(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void taosCleanUpUdpConnection(void *handle); void taosCleanUpUdpConnection(void *handle);
int taosSendUdpData(uint32_t ip, short port, char *data, int dataLen, void *chandle); int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle);
void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, short port); void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosFreeMsgHdr(void *hdr); void taosFreeMsgHdr(void *hdr);
int taosMsgHdrSize(void *hdr); int taosMsgHdrSize(void *hdr);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
typedef struct _ip_hash_t { typedef struct _ip_hash_t {
uint32_t ip; uint32_t ip;
short port; uint16_t port;
int hash; int hash;
struct _ip_hash_t *prev; struct _ip_hash_t *prev;
struct _ip_hash_t *next; struct _ip_hash_t *next;
...@@ -32,20 +32,20 @@ typedef struct { ...@@ -32,20 +32,20 @@ typedef struct {
int maxSessions; int maxSessions;
} SHashObj; } SHashObj;
int taosHashIp(void *handle, uint32_t ip, short port) { int taosHashIp(void *handle, uint32_t ip, uint16_t port) {
SHashObj *pObj = (SHashObj *)handle; SHashObj *pObj = (SHashObj *)handle;
int hash = 0; int hash = 0;
hash = (int)(ip >> 16); hash = (int)(ip >> 16);
hash += (unsigned short)(ip & 0xFFFF); hash += (unsigned short)(ip & 0xFFFF);
hash += (unsigned short)port; hash += port;
hash = hash % pObj->maxSessions; hash = hash % pObj->maxSessions;
return hash; return hash;
} }
void *taosAddIpHash(void *handle, void *data, uint32_t ip, short port) { void *taosAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash * pNode;
SHashObj *pObj; SHashObj *pObj;
...@@ -68,7 +68,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, short port) { ...@@ -68,7 +68,7 @@ void *taosAddIpHash(void *handle, void *data, uint32_t ip, short port) {
return pObj; return pObj;
} }
void taosDeleteIpHash(void *handle, uint32_t ip, short port) { void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash * pNode;
SHashObj *pObj; SHashObj *pObj;
...@@ -100,7 +100,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, short port) { ...@@ -100,7 +100,7 @@ void taosDeleteIpHash(void *handle, uint32_t ip, short port) {
} }
} }
void *taosGetIpHash(void *handle, uint32_t ip, short port) { void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash * pNode;
SHashObj *pObj; SHashObj *pObj;
......
...@@ -51,10 +51,10 @@ typedef struct { ...@@ -51,10 +51,10 @@ typedef struct {
uint8_t secret[TSDB_KEY_LEN]; uint8_t secret[TSDB_KEY_LEN];
uint8_t ckey[TSDB_KEY_LEN]; uint8_t ckey[TSDB_KEY_LEN];
short localPort; // for UDP only uint16_t localPort; // for UDP only
uint32_t peerUid; uint32_t peerUid;
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
short peerPort; // peer port uint16_t peerPort; // peer port
char peerIpstr[20]; // peer IP string char peerIpstr[20]; // peer IP string
uint16_t tranId; // outgoing transcation ID, for build message uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID uint16_t outTranId; // outgoing transcation ID
...@@ -99,7 +99,7 @@ typedef struct rpc_server { ...@@ -99,7 +99,7 @@ typedef struct rpc_server {
int idleTime; // milliseconds; int idleTime; // milliseconds;
int noFree; // do not free the request msg when rsp is received int noFree; // do not free the request msg when rsp is received
int index; // for UDP server, next thread for new connection int index; // for UDP server, next thread for new connection
short localPort; uint16_t localPort;
char label[12]; char label[12];
void *(*fp)(char *, void *ahandle, void *thandle); void *(*fp)(char *, void *ahandle, void *thandle);
void (*efp)(int); // FP to report error void (*efp)(int); // FP to report error
...@@ -114,16 +114,16 @@ int tsRpcProgressTime = 10; // milliseocnds ...@@ -114,16 +114,16 @@ int tsRpcProgressTime = 10; // milliseocnds
int tsRpcMaxRetry; int tsRpcMaxRetry;
int tsRpcHeadSize; int tsRpcHeadSize;
void *(*taosInitConn[])(char *ip, short port, char *label, int threads, void *fp, void *shandle) = { void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpServer, taosInitUdpClient, taosInitTcpServer, taosInitTcpClient}; taosInitUdpServer, taosInitUdpClient, taosInitTcpServer, taosInitTcpClient};
void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer, void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
taosCleanUpTcpClient}; taosCleanUpTcpClient};
int (*taosSendData[])(uint32_t ip, short port, char *data, int len, void *chandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, char *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData, taosSendTcpServerData, taosSendTcpClientData}; taosSendUdpData, taosSendUdpData, taosSendTcpServerData, taosSendTcpClientData};
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, short port) = { void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = {
taosOpenUdpConnection, taosOpenUdpConnection,
taosOpenUdpConnection, taosOpenUdpConnection,
NULL, NULL,
...@@ -134,7 +134,7 @@ void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpServerConnecti ...@@ -134,7 +134,7 @@ void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpServerConnecti
int taosReSendRspToPeer(SRpcConn *pConn); int taosReSendRspToPeer(SRpcConn *pConn);
void taosProcessTaosTimer(void *, void *); void taosProcessTaosTimer(void *, void *);
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, void *shandle, void *thandle, void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *chandle); void *chandle);
int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen); int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen);
void taosProcessSchedMsg(SSchedMsg *pMsg); void taosProcessSchedMsg(SSchedMsg *pMsg);
...@@ -720,7 +720,7 @@ void taosProcessResponse(SRpcConn *pConn) { ...@@ -720,7 +720,7 @@ void taosProcessResponse(SRpcConn *pConn) {
} }
int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pServer, int dataLen, uint32_t ip, int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pServer, int dataLen, uint32_t ip,
short port, void *chandle) { uint16_t port, void *chandle) {
int chann, sid, code = 0; int chann, sid, code = 0;
SRpcConn * pConn = NULL; SRpcConn * pConn = NULL;
SRpcChann *pChann; SRpcChann *pChann;
...@@ -1009,7 +1009,7 @@ void taosProcessIdleTimer(void *param, void *tmrId) { ...@@ -1009,7 +1009,7 @@ void taosProcessIdleTimer(void *param, void *tmrId) {
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
} }
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, void *shandle, void *thandle, void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *chandle) { void *chandle) {
STaosHeader *pHeader; STaosHeader *pHeader;
uint8_t code; uint8_t code;
...@@ -1075,6 +1075,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, ...@@ -1075,6 +1075,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
// parsing error // parsing error
if (pHeader->msgType & 1) { if (pHeader->msgType & 1) {
memset(pReply, 0, sizeof(pReply));
msgLen = taosBuildErrorMsgToPeer(data, code, pReply); msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle); (*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid, tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
...@@ -1311,7 +1312,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { ...@@ -1311,7 +1312,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
} }
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, short *peerPort, int *cid, int *sid) { void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) {
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
*peerId = pConn->peerId; *peerId = pConn->peerId;
......
...@@ -30,7 +30,7 @@ typedef struct _tcp_fd { ...@@ -30,7 +30,7 @@ typedef struct _tcp_fd {
void * thandle; void * thandle;
uint32_t ip; uint32_t ip;
char ipstr[20]; char ipstr[20];
short port; uint16_t port;
struct _tcp_client *pTcp; struct _tcp_client *pTcp;
struct _tcp_fd * prev, *next; struct _tcp_fd * prev, *next;
} STcpFd; } STcpFd;
...@@ -45,7 +45,7 @@ typedef struct _tcp_client { ...@@ -45,7 +45,7 @@ typedef struct _tcp_client {
char label[12]; char label[12];
char ipstr[20]; char ipstr[20];
void * shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
void *(*processData)(char *data, int dataLen, unsigned int ip, short port, void *shandle, void *thandle, void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle,
void *chandle); void *chandle);
// char buffer[128000]; // char buffer[128000];
} STcpClient; } STcpClient;
...@@ -194,7 +194,7 @@ static void *taosReadTcpData(void *param) { ...@@ -194,7 +194,7 @@ static void *taosReadTcpData(void *param) {
return NULL; return NULL;
} }
void *taosInitTcpClient(char *ip, short port, char *label, int num, void *fp, void *shandle) { void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
STcpClient * pTcp; STcpClient * pTcp;
pthread_attr_t thattr; pthread_attr_t thattr;
...@@ -229,7 +229,7 @@ void *taosInitTcpClient(char *ip, short port, char *label, int num, void *fp, vo ...@@ -229,7 +229,7 @@ void *taosInitTcpClient(char *ip, short port, char *label, int num, void *fp, vo
return NULL; return NULL;
} }
tTrace("%s TCP client is initialized, ip:%s port:%u", label, ip, port); tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port);
return pTcp; return pTcp;
} }
...@@ -242,7 +242,7 @@ void taosCloseTcpClientConnection(void *chandle) { ...@@ -242,7 +242,7 @@ void taosCloseTcpClientConnection(void *chandle) {
taosCleanUpTcpFdObj(pFdObj); taosCleanUpTcpFdObj(pFdObj);
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, short port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
STcpClient * pTcp = (STcpClient *)shandle; STcpClient * pTcp = (STcpClient *)shandle;
STcpFd * pFdObj; STcpFd * pFdObj;
struct epoll_event event; struct epoll_event event;
...@@ -301,12 +301,12 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, short ...@@ -301,12 +301,12 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, short
pthread_mutex_unlock(&(pTcp->mutex)); pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to ip:%s port:%u is created, numOfFds:%d", pTcp->label, ip, port, pTcp->numOfFds); tTrace("%s TCP connection to ip:%s port:%hu is created, numOfFds:%d", pTcp->label, ip, port, pTcp->numOfFds);
return pFdObj; return pFdObj;
} }
int taosSendTcpClientData(uint32_t ip, short port, char *data, int len, void *chandle) { int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle; STcpFd *pFdObj = (STcpFd *)chandle;
if (chandle == NULL) return -1; if (chandle == NULL) return -1;
......
...@@ -32,7 +32,7 @@ typedef struct _fd_obj { ...@@ -32,7 +32,7 @@ typedef struct _fd_obj {
void * thandle; // handle from upper layer, like TAOS void * thandle; // handle from upper layer, like TAOS
char ipstr[TAOS_IPv4ADDR_LEN]; char ipstr[TAOS_IPv4ADDR_LEN];
unsigned int ip; unsigned int ip;
unsigned short port; uint16_t port;
struct _thread_obj *pThreadObj; struct _thread_obj *pThreadObj;
struct _fd_obj * prev, *next; struct _fd_obj * prev, *next;
} SFdObj; } SFdObj;
...@@ -48,13 +48,13 @@ typedef struct _thread_obj { ...@@ -48,13 +48,13 @@ typedef struct _thread_obj {
char label[12]; char label[12];
// char buffer[128000]; // buffer to receive data // char buffer[128000]; // buffer to receive data
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(char *data, int dataLen, unsigned int ip, short port, void *shandle, void *thandle, void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle,
void *chandle); void *chandle);
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
char ip[40]; char ip[40];
short port; uint16_t port;
char label[12]; char label[12];
int numOfThreads; int numOfThreads;
void * shandle; void * shandle;
...@@ -209,7 +209,7 @@ static void taosProcessTcpData(void *param) { ...@@ -209,7 +209,7 @@ static void taosProcessTcpData(void *param) {
continue; continue;
} }
pFdObj->thandle = (*(pThreadObj->processData))(buffer, dataLen, pFdObj->ip, (int16_t)pFdObj->port, pFdObj->thandle = (*(pThreadObj->processData))(buffer, dataLen, pFdObj->ip, pFdObj->port,
pThreadObj->shandle, pFdObj->thandle, pFdObj); pThreadObj->shandle, pFdObj->thandle, pFdObj);
if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj); if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj);
...@@ -232,10 +232,10 @@ void taosAcceptTcpConnection(void *arg) { ...@@ -232,10 +232,10 @@ void taosAcceptTcpConnection(void *arg) {
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) { if (sockFd < 0) {
tError("%s failed to open TCP socket, ip:%s, port:%u", pServerObj->label, pServerObj->ip, pServerObj->port); tError("%s failed to open TCP socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
return; return;
} else { } else {
tTrace("%s TCP server is ready, ip:%s, port:%u", pServerObj->label, pServerObj->ip, pServerObj->port); tTrace("%s TCP server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
} }
while (1) { while (1) {
...@@ -247,7 +247,7 @@ void taosAcceptTcpConnection(void *arg) { ...@@ -247,7 +247,7 @@ void taosAcceptTcpConnection(void *arg) {
continue; continue;
} }
tTrace("%s TCP connection from ip:%s port:%u", pServerObj->label, inet_ntoa(clientAddr.sin_addr), tTrace("%s TCP connection from ip:%s port:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr),
htons(clientAddr.sin_port)); htons(clientAddr.sin_port));
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
...@@ -292,7 +292,7 @@ void taosAcceptTcpConnection(void *arg) { ...@@ -292,7 +292,7 @@ void taosAcceptTcpConnection(void *arg) {
pthread_mutex_unlock(&(pThreadObj->threadMutex)); pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s TCP thread:%d, a new connection, ip:%s port:%u, numOfFds:%d", pServerObj->label, pThreadObj->threadId, tTrace("%s TCP thread:%d, a new connection, ip:%s port:%hu, numOfFds:%d", pServerObj->label, pThreadObj->threadId,
pFdObj->ipstr, pFdObj->port, pThreadObj->numOfFds); pFdObj->ipstr, pFdObj->port, pThreadObj->numOfFds);
// pick up next thread for next connection // pick up next thread for next connection
...@@ -314,10 +314,10 @@ void taosAcceptUDConnection(void *arg) { ...@@ -314,10 +314,10 @@ void taosAcceptUDConnection(void *arg) {
sockFd = taosOpenUDServerSocket(pServerObj->ip, pServerObj->port); sockFd = taosOpenUDServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) { if (sockFd < 0) {
tError("%s failed to open UD socket, ip:%s, port:%u", pServerObj->label, pServerObj->ip, pServerObj->port); tError("%s failed to open UD socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
return; return;
} else { } else {
tTrace("%s UD server is ready, ip:%s, port:%u", pServerObj->label, pServerObj->ip, pServerObj->port); tTrace("%s UD server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
} }
while (1) { while (1) {
...@@ -374,7 +374,7 @@ void taosAcceptUDConnection(void *arg) { ...@@ -374,7 +374,7 @@ void taosAcceptUDConnection(void *arg) {
} }
} }
void *taosInitTcpServer(char *ip, short port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
int i; int i;
SServerObj * pServerObj; SServerObj * pServerObj;
pthread_attr_t thattr; pthread_attr_t thattr;
...@@ -442,7 +442,7 @@ void *taosInitTcpServer(char *ip, short port, char *label, int numOfThreads, voi ...@@ -442,7 +442,7 @@ void *taosInitTcpServer(char *ip, short port, char *label, int numOfThreads, voi
} }
*/ */
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
tTrace("%s TCP server is initialized, ip:%s port:%u numOfThreads:%d", label, ip, port, numOfThreads); tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
return (void *)pServerObj; return (void *)pServerObj;
} }
...@@ -468,7 +468,7 @@ void taosListTcpConnection(void *handle, char *buffer) { ...@@ -468,7 +468,7 @@ void taosListTcpConnection(void *handle, char *buffer) {
msg = msg + strlen(msg); msg = msg + strlen(msg);
pFdObj = pThreadObj->pHead; pFdObj = pThreadObj->pHead;
while (pFdObj) { while (pFdObj) {
sprintf(" ip:%s port:%u\n", pFdObj->ipstr, pFdObj->port); sprintf(" ip:%s port:%hu\n", pFdObj->ipstr, pFdObj->port);
msg = msg + strlen(msg); msg = msg + strlen(msg);
numOfFds++; numOfFds++;
numOfConns++; numOfConns++;
...@@ -487,7 +487,7 @@ void taosListTcpConnection(void *handle, char *buffer) { ...@@ -487,7 +487,7 @@ void taosListTcpConnection(void *handle, char *buffer) {
return; return;
} }
int taosSendTcpServerData(uint32_t ip, short port, char *data, int len, void *chandle) { int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle; SFdObj *pFdObj = (SFdObj *)chandle;
if (chandle == NULL) return -1; if (chandle == NULL) return -1;
......
...@@ -35,8 +35,8 @@ typedef struct { ...@@ -35,8 +35,8 @@ typedef struct {
void * signature; void * signature;
int index; int index;
int fd; int fd;
short port; // peer port uint16_t port; // peer port
short localPort; // local port uint16_t localPort; // local port
char label[12]; // copy from udpConnSet; char label[12]; // copy from udpConnSet;
pthread_t thread; pthread_t thread;
pthread_mutex_t mutex; pthread_mutex_t mutex;
...@@ -44,7 +44,7 @@ typedef struct { ...@@ -44,7 +44,7 @@ typedef struct {
void * hash; void * hash;
void * shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
void * pSet; void * pSet;
void *(*processData)(char *data, int dataLen, unsigned int ip, short port, void *shandle, void *thandle, void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle,
void *chandle); void *chandle);
char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data
} SUdpConn; } SUdpConn;
...@@ -53,21 +53,21 @@ typedef struct { ...@@ -53,21 +53,21 @@ typedef struct {
int index; int index;
int server; int server;
char ip[16]; // local IP char ip[16]; // local IP
short port; // local Port uint16_t port; // local Port
void * shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
int threads; int threads;
char label[12]; char label[12];
void * tmrCtrl; void * tmrCtrl;
pthread_t tcpThread; pthread_t tcpThread;
int tcpFd; int tcpFd;
void *(*fp)(char *data, int dataLen, uint32_t ip, short port, void *shandle, void *thandle, void *chandle); void *(*fp)(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle);
SUdpConn udpConn[]; SUdpConn udpConn[];
} SUdpConnSet; } SUdpConnSet;
typedef struct { typedef struct {
void * signature; void * signature;
uint32_t ip; // dest IP uint32_t ip; // dest IP
short port; // dest Port uint16_t port; // dest Port
SUdpConn * pConn; SUdpConn * pConn;
struct sockaddr_in destAdd; struct sockaddr_in destAdd;
void * msgHdr; void * msgHdr;
...@@ -144,12 +144,12 @@ void *taosReadTcpData(void *argv) { ...@@ -144,12 +144,12 @@ void *taosReadTcpData(void *argv) {
pInfo->msgLen = (int32_t)htonl((uint32_t)pInfo->msgLen); pInfo->msgLen = (int32_t)htonl((uint32_t)pInfo->msgLen);
tinet_ntoa(ipstr, pMonitor->ip); tinet_ntoa(ipstr, pMonitor->ip);
tTrace("%s receive packet via TCP:%s:%d, msgLen:%d, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d", pSet->label, tTrace("%s receive packet via TCP:%s:%hu, msgLen:%d, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d", pSet->label,
ipstr, pInfo->port, pInfo->msgLen, pInfo->handle, pHead->sourceId, pHead->destId, pHead->tranId); ipstr, pInfo->port, pInfo->msgLen, pInfo->handle, pHead->sourceId, pHead->destId, pHead->tranId);
fd = taosOpenTcpClientSocket(ipstr, (int16_t)pInfo->port, tsLocalIp); fd = taosOpenTcpClientSocket(ipstr, (int16_t)pInfo->port, tsLocalIp);
if (fd < 0) { if (fd < 0) {
tError("%s failed to open TCP client socket ip:%s:%d", pSet->label, ipstr, pInfo->port); tError("%s failed to open TCP client socket ip:%s:%hu", pSet->label, ipstr, pInfo->port);
pMonitor->pSet = NULL; pMonitor->pSet = NULL;
return NULL; return NULL;
} }
...@@ -180,7 +180,7 @@ void *taosReadTcpData(void *argv) { ...@@ -180,7 +180,7 @@ void *taosReadTcpData(void *argv) {
tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen); tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen);
tfree(buffer); tfree(buffer);
} else { } else {
(*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, (int16_t)pInfo->port, pSet->shandle, NULL, pMonitor->pConn); (*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, pInfo->port, pSet->shandle, NULL, pMonitor->pConn);
} }
} }
...@@ -224,7 +224,7 @@ void *taosRecvUdpData(void *param) { ...@@ -224,7 +224,7 @@ void *taosRecvUdpData(void *param) {
struct sockaddr_in sourceAdd; struct sockaddr_in sourceAdd;
unsigned int addLen, dataLen; unsigned int addLen, dataLen;
SUdpConn * pConn = (SUdpConn *)param; SUdpConn * pConn = (SUdpConn *)param;
short port; uint16_t port;
int minSize = sizeof(STaosHeader); int minSize = sizeof(STaosHeader);
memset(&sourceAdd, 0, sizeof(sourceAdd)); memset(&sourceAdd, 0, sizeof(sourceAdd));
...@@ -242,7 +242,7 @@ void *taosRecvUdpData(void *param) { ...@@ -242,7 +242,7 @@ void *taosRecvUdpData(void *param) {
continue; continue;
} }
port = (int16_t)ntohs(sourceAdd.sin_port); port = ntohs(sourceAdd.sin_port);
int processedLen = 0, leftLen = 0; int processedLen = 0, leftLen = 0;
int msgLen = 0; int msgLen = 0;
...@@ -307,7 +307,7 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -307,7 +307,7 @@ void *taosTransferDataViaTcp(void *argv) {
if (handle == 0) { if (handle == 0) {
// receive a packet from client // receive a packet from client
tTrace("%s data will be received via TCP from 0x%x:%d", pSet->label, pTransfer->ip, pTransfer->port); tTrace("%s data will be received via TCP from 0x%x:%hu", pSet->label, pTransfer->ip, pTransfer->port);
retLen = taosReadMsg(connFd, &head, sizeof(STaosHeader)); retLen = taosReadMsg(connFd, &head, sizeof(STaosHeader));
if (retLen != (int)sizeof(STaosHeader)) { if (retLen != (int)sizeof(STaosHeader)) {
tError("%s failed to read msg header, retLen:%d", pSet->label, retLen); tError("%s failed to read msg header, retLen:%d", pSet->label, retLen);
...@@ -345,7 +345,7 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -345,7 +345,7 @@ void *taosTransferDataViaTcp(void *argv) {
tError("%s failed to read data from client, leftLen:%d retLen:%d, error:%s", pSet->label, leftLen, retLen, tError("%s failed to read data from client, leftLen:%d retLen:%d, error:%s", pSet->label, leftLen, retLen,
strerror(errno)); strerror(errno));
} else { } else {
tTrace("%s data is received from client via TCP from 0x%x:%d, msgLen:%d", pSet->label, pTransfer->ip, tTrace("%s data is received from client via TCP from 0x%x:%hu, msgLen:%d", pSet->label, pTransfer->ip,
pTransfer->port, msgLen); pTransfer->port, msgLen);
pSet->index = (pSet->index + 1) % pSet->threads; pSet->index = (pSet->index + 1) % pSet->threads;
SUdpConn *pConn = pSet->udpConn + pSet->index; SUdpConn *pConn = pSet->udpConn + pSet->index;
...@@ -388,7 +388,7 @@ void *taosTransferDataViaTcp(void *argv) { ...@@ -388,7 +388,7 @@ void *taosTransferDataViaTcp(void *argv) {
if (retLen != msgLen) { if (retLen != msgLen) {
tError("%s failed to send data to client, msgLen:%d retLen:%d", pSet->label, msgLen, retLen); tError("%s failed to send data to client, msgLen:%d retLen:%d", pSet->label, msgLen, retLen);
} else { } else {
tTrace("%s data is sent to client successfully via TCP to 0x%x:%d, size:%d", pSet->label, pTransfer->ip, tTrace("%s data is sent to client successfully via TCP to 0x%x:%hu, size:%d", pSet->label, pTransfer->ip,
pTransfer->port, msgLen); pTransfer->port, msgLen);
} }
} }
...@@ -413,13 +413,13 @@ void *taosUdpTcpConnection(void *argv) { ...@@ -413,13 +413,13 @@ void *taosUdpTcpConnection(void *argv) {
pSet->tcpFd = taosOpenTcpServerSocket(pSet->ip, pSet->port); pSet->tcpFd = taosOpenTcpServerSocket(pSet->ip, pSet->port);
if (pSet->tcpFd < 0) { if (pSet->tcpFd < 0) {
tPrint("%s failed to create TCP socket %s:%d for UDP server, reason:%s", pSet->label, pSet->ip, pSet->port, tPrint("%s failed to create TCP socket %s:%hu for UDP server, reason:%s", pSet->label, pSet->ip, pSet->port,
strerror(errno)); strerror(errno));
taosKillSystem(); taosKillSystem();
return NULL; return NULL;
} }
tTrace("%s UDP server is created, ip:%s:%d", pSet->label, pSet->ip, pSet->port); tTrace("%s UDP server is created, ip:%s:%hu", pSet->label, pSet->ip, pSet->port);
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
...@@ -455,7 +455,7 @@ void *taosUdpTcpConnection(void *argv) { ...@@ -455,7 +455,7 @@ void *taosUdpTcpConnection(void *argv) {
return NULL; return NULL;
} }
void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
pthread_attr_t thAttr; pthread_attr_t thAttr;
SUdpConn * pConn; SUdpConn * pConn;
SUdpConnSet * pSet; SUdpConnSet * pSet;
...@@ -488,13 +488,13 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void ...@@ -488,13 +488,13 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
short ownPort; uint16_t ownPort;
for (int i = 0; i < threads; ++i) { for (int i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
ownPort = (int16_t)(port ? port + i : 0); ownPort = (port ? port + i : 0);
pConn->fd = taosOpenUdpSocket(ip, ownPort); pConn->fd = taosOpenUdpSocket(ip, ownPort);
if (pConn->fd < 0) { if (pConn->fd < 0) {
tError("%s failed to open UDP socket %s:%d", label, ip, port); tError("%s failed to open UDP socket %s:%hu", label, ip, port);
taosCleanUpUdpConnection(pSet); taosCleanUpUdpConnection(pSet);
return NULL; return NULL;
} }
...@@ -528,12 +528,12 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void ...@@ -528,12 +528,12 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
tTrace("%s UDP connection is initialized, ip:%s port:%u threads:%d", label, ip, port, threads); tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads);
return pSet; return pSet;
} }
void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpServer(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
SUdpConnSet *pSet; SUdpConnSet *pSet;
pSet = taosInitUdpConnection(ip, port, label, threads, fp, shandle); pSet = taosInitUdpConnection(ip, port, label, threads, fp, shandle);
if (pSet == NULL) return NULL; if (pSet == NULL) return NULL;
...@@ -554,7 +554,7 @@ void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp ...@@ -554,7 +554,7 @@ void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp
return pSet; return pSet;
} }
void *taosInitUdpClient(char *ip, short port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpClient(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
return taosInitUdpConnection(ip, port, label, threads, fp, shandle); return taosInitUdpConnection(ip, port, label, threads, fp, shandle);
} }
...@@ -590,7 +590,7 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -590,7 +590,7 @@ void taosCleanUpUdpConnection(void *handle) {
tfree(pSet); tfree(pSet);
} }
void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, short port) { void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
SUdpConnSet *pSet = (SUdpConnSet *)shandle; SUdpConnSet *pSet = (SUdpConnSet *)shandle;
pSet->index = (pSet->index + 1) % pSet->threads; pSet->index = (pSet->index + 1) % pSet->threads;
...@@ -598,7 +598,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, short port) ...@@ -598,7 +598,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, short port)
SUdpConn *pConn = pSet->udpConn + pSet->index; SUdpConn *pConn = pSet->udpConn + pSet->index;
pConn->port = port; pConn->port = port;
tTrace("%s UDP connection is setup, ip: %s:%d, local: %s:%d", pConn->label, ip, port, pSet->ip, tTrace("%s UDP connection is setup, ip: %s:%hu, local: %s:%d", pConn->label, ip, port, pSet->ip,
ntohs((uint16_t)pConn->localPort)); ntohs((uint16_t)pConn->localPort));
return pConn; return pConn;
...@@ -642,7 +642,7 @@ void taosProcessUdpBufTimer(void *param, void *tmrId) { ...@@ -642,7 +642,7 @@ void taosProcessUdpBufTimer(void *param, void *tmrId) {
if (pBuf) taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); if (pBuf) taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
} }
SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, short port) { SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) {
SUdpBuf *pBuf = (SUdpBuf *)malloc(sizeof(SUdpBuf)); SUdpBuf *pBuf = (SUdpBuf *)malloc(sizeof(SUdpBuf));
memset(pBuf, 0, sizeof(SUdpBuf)); memset(pBuf, 0, sizeof(SUdpBuf));
...@@ -652,7 +652,7 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, short port) { ...@@ -652,7 +652,7 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, short port) {
pBuf->destAdd.sin_family = AF_INET; pBuf->destAdd.sin_family = AF_INET;
pBuf->destAdd.sin_addr.s_addr = ip; pBuf->destAdd.sin_addr.s_addr = ip;
pBuf->destAdd.sin_port = (uint16_t)htons((uint16_t)port); pBuf->destAdd.sin_port = (uint16_t)htons(port);
taosInitMsgHdr(&(pBuf->msgHdr), &(pBuf->destAdd), RPC_MAX_UDP_PKTS); taosInitMsgHdr(&(pBuf->msgHdr), &(pBuf->destAdd), RPC_MAX_UDP_PKTS);
pBuf->signature = pBuf; pBuf->signature = pBuf;
taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
...@@ -663,7 +663,7 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, short port) { ...@@ -663,7 +663,7 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, short port) {
return pBuf; return pBuf;
} }
int taosSendPacketViaTcp(uint32_t ip, short port, char *data, int dataLen, void *chandle) { int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle) {
SUdpConn * pConn = (SUdpConn *)chandle; SUdpConn * pConn = (SUdpConn *)chandle;
SUdpConnSet *pSet = (SUdpConnSet *)pConn->pSet; SUdpConnSet *pSet = (SUdpConnSet *)pConn->pSet;
int code = -1, retLen, msgLen; int code = -1, retLen, msgLen;
...@@ -680,13 +680,13 @@ int taosSendPacketViaTcp(uint32_t ip, short port, char *data, int dataLen, void ...@@ -680,13 +680,13 @@ int taosSendPacketViaTcp(uint32_t ip, short port, char *data, int dataLen, void
SPacketInfo *pInfo = (SPacketInfo *)pHead->content; SPacketInfo *pInfo = (SPacketInfo *)pHead->content;
pInfo->handle = (uint64_t)data; pInfo->handle = (uint64_t)data;
pInfo->port = (uint16_t)pSet->port; pInfo->port = pSet->port;
pInfo->msgLen = pHead->msgLen; pInfo->msgLen = pHead->msgLen;
msgLen = sizeof(STaosHeader) + sizeof(SPacketInfo); msgLen = sizeof(STaosHeader) + sizeof(SPacketInfo);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
code = taosSendUdpData(ip, port, buffer, msgLen, chandle); code = taosSendUdpData(ip, port, buffer, msgLen, chandle);
tTrace("%s data from server will be sent via TCP:%d, msgType:%d, length:%d, handle:0x%x", pSet->label, pInfo->port, tTrace("%s data from server will be sent via TCP:%hu, msgType:%d, length:%d, handle:0x%x", pSet->label, pInfo->port,
pHead->msgType, htonl((uint32_t)pInfo->msgLen), pInfo->handle); pHead->msgType, htonl((uint32_t)pInfo->msgLen), pInfo->handle);
if (code > 0) code = dataLen; if (code > 0) code = dataLen;
} else { } else {
...@@ -706,7 +706,7 @@ int taosSendPacketViaTcp(uint32_t ip, short port, char *data, int dataLen, void ...@@ -706,7 +706,7 @@ int taosSendPacketViaTcp(uint32_t ip, short port, char *data, int dataLen, void
tinet_ntoa(ipstr, ip); tinet_ntoa(ipstr, ip);
int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp); int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp);
if (fd < 0) { if (fd < 0) {
tError("%s failed to open TCP socket to:%s:%u to send packet", pSet->label, ipstr, pConn->port); tError("%s failed to open TCP socket to:%s:%hu to send packet", pSet->label, ipstr, pConn->port);
} else { } else {
SHandleViaTcp handleViaTcp; SHandleViaTcp handleViaTcp;
taosInitHandleViaTcp(&handleViaTcp, 0); taosInitHandleViaTcp(&handleViaTcp, 0);
...@@ -734,7 +734,7 @@ int taosSendPacketViaTcp(uint32_t ip, short port, char *data, int dataLen, void ...@@ -734,7 +734,7 @@ int taosSendPacketViaTcp(uint32_t ip, short port, char *data, int dataLen, void
return code; return code;
} }
int taosSendUdpData(uint32_t ip, short port, char *data, int dataLen, void *chandle) { int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle) {
SUdpConn *pConn = (SUdpConn *)chandle; SUdpConn *pConn = (SUdpConn *)chandle;
SUdpBuf * pBuf; SUdpBuf * pBuf;
...@@ -747,7 +747,7 @@ int taosSendUdpData(uint32_t ip, short port, char *data, int dataLen, void *chan ...@@ -747,7 +747,7 @@ int taosSendUdpData(uint32_t ip, short port, char *data, int dataLen, void *chan
memset(&destAdd, 0, sizeof(destAdd)); memset(&destAdd, 0, sizeof(destAdd));
destAdd.sin_family = AF_INET; destAdd.sin_family = AF_INET;
destAdd.sin_addr.s_addr = ip; destAdd.sin_addr.s_addr = ip;
destAdd.sin_port = htons((uint16_t)port); destAdd.sin_port = htons(port);
int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr, tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr,
......
...@@ -233,7 +233,7 @@ typedef struct _connObj { ...@@ -233,7 +233,7 @@ typedef struct _connObj {
uint32_t queryId; // query ID to be killed uint32_t queryId; // query ID to be killed
uint32_t streamId; // stream ID to be killed uint32_t streamId; // stream ID to be killed
uint32_t ip; // shell IP uint32_t ip; // shell IP
short port; // shell port uint16_t port; // shell port
void * thandle; void * thandle;
SQList * pQList; // query list SQList * pQList; // query list
SSList * pSList; // stream list SSList * pSList; // stream list
......
...@@ -26,7 +26,7 @@ typedef struct { ...@@ -26,7 +26,7 @@ typedef struct {
int sid; int sid;
int vnode; int vnode;
uint32_t ip; uint32_t ip;
short port; uint16_t port;
int count; // track the number of imports int count; // track the number of imports
int code; // track the code of imports int code; // track the code of imports
int numOfTotalPoints; // track the total number of points imported int numOfTotalPoints; // track the total number of points imported
......
...@@ -25,7 +25,7 @@ typedef struct { ...@@ -25,7 +25,7 @@ typedef struct {
char user[TSDB_METER_ID_LEN]; char user[TSDB_METER_ID_LEN];
uint64_t stime; uint64_t stime;
uint32_t ip; uint32_t ip;
short port; uint16_t port;
} SConnInfo; } SConnInfo;
typedef struct { typedef struct {
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
typedef struct { typedef struct {
uint32_t ip; uint32_t ip;
short port; uint16_t port;
char user[TSDB_METER_ID_LEN]; char user[TSDB_METER_ID_LEN];
} SCDesc; } SCDesc;
...@@ -180,7 +180,7 @@ int mgmtKillQuery(char *qidstr, SConnObj *pConn) { ...@@ -180,7 +180,7 @@ int mgmtKillQuery(char *qidstr, SConnObj *pConn) {
chr = strchr(temp, ':'); chr = strchr(temp, ':');
if (chr == NULL) goto _error; if (chr == NULL) goto _error;
*chr = 0; *chr = 0;
short port = htons(atoi(temp)); uint16_t port = htons(atoi(temp));
temp = chr + 1; temp = chr + 1;
uint32_t queryId = atoi(temp); uint32_t queryId = atoi(temp);
...@@ -448,7 +448,7 @@ int mgmtKillStream(char *qidstr, SConnObj *pConn) { ...@@ -448,7 +448,7 @@ int mgmtKillStream(char *qidstr, SConnObj *pConn) {
chr = strchr(temp, ':'); chr = strchr(temp, ':');
if (chr == NULL) goto _error; if (chr == NULL) goto _error;
*chr = 0; *chr = 0;
short port = htons(atoi(temp)); uint16_t port = htons(atoi(temp));
temp = chr + 1; temp = chr + 1;
uint32_t streamId = atoi(temp); uint32_t streamId = atoi(temp);
......
...@@ -1276,7 +1276,7 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -1276,7 +1276,7 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
if (pConn->pUser) { if (pConn->pUser) {
pConn->pAcct = mgmtGetAcct(pConn->pUser->acct); pConn->pAcct = mgmtGetAcct(pConn->pUser->acct);
mgmtEstablishConn(pConn); mgmtEstablishConn(pConn);
mTrace("login from:%x:%d", pConn->ip, htons(pConn->port)); mTrace("login from:%x:%hu", pConn->ip, htons(pConn->port));
} }
} }
......
...@@ -47,7 +47,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -47,7 +47,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
SShellObj *pObj = (SShellObj *)ahandle; SShellObj *pObj = (SShellObj *)ahandle;
SIntMsg * pMsg = (SIntMsg *)msg; SIntMsg * pMsg = (SIntMsg *)msg;
uint32_t peerId, peerIp; uint32_t peerId, peerIp;
short peerPort; uint16_t peerPort;
char ipstr[20]; char ipstr[20];
if (msg == NULL) { if (msg == NULL) {
......
...@@ -58,12 +58,12 @@ int64_t tsMsPerDay[] = {86400000L, 86400000000L}; ...@@ -58,12 +58,12 @@ int64_t tsMsPerDay[] = {86400000L, 86400000000L};
char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0}; char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0};
char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0}; char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0};
short tsMgmtShellPort = 6030; // udp[6030-6034] tcp[6030] uint16_t tsMgmtShellPort = 6030; // udp[6030-6034] tcp[6030]
short tsVnodeShellPort = 6035; // udp[6035-6039] tcp[6035] uint16_t tsVnodeShellPort = 6035; // udp[6035-6039] tcp[6035]
short tsMgmtVnodePort = 6040; // udp[6040-6044] tcp[6040] uint16_t tsMgmtVnodePort = 6040; // udp[6040-6044] tcp[6040]
short tsVnodeVnodePort = 6045; // tcp[6045] uint16_t tsVnodeVnodePort = 6045; // tcp[6045]
short tsMgmtMgmtPort = 6050; // udp, numOfVnodes fixed to 1, range udp[6050] uint16_t tsMgmtMgmtPort = 6050; // udp, numOfVnodes fixed to 1, range udp[6050]
short tsMgmtSyncPort = 6050; // tcp, range tcp[6050] uint16_t tsMgmtSyncPort = 6050; // tcp, range tcp[6050]
int tsStatusInterval = 1; // second int tsStatusInterval = 1; // second
int tsShellActivityTimer = 3; // second int tsShellActivityTimer = 3; // second
...@@ -152,8 +152,8 @@ int tsProjectExecInterval = 10000; // every 10sec, the projection will be ...@@ -152,8 +152,8 @@ int tsProjectExecInterval = 10000; // every 10sec, the projection will be
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
char tsHttpIp[TSDB_IPv4ADDR_LEN] = "0.0.0.0"; char tsHttpIp[TSDB_IPv4ADDR_LEN] = "0.0.0.0";
short tsHttpPort = 6020; // only tcp, range tcp[6020] uint16_t tsHttpPort = 6020; // only tcp, range tcp[6020]
// short tsNginxPort = 6060; //only tcp, range tcp[6060] // uint16_t tsNginxPort = 6060; //only tcp, range tcp[6060]
int tsHttpCacheSessions = 100; int tsHttpCacheSessions = 100;
int tsHttpSessionExpire = 36000; int tsHttpSessionExpire = 36000;
int tsHttpMaxThreads = 2; int tsHttpMaxThreads = 2;
......
...@@ -261,19 +261,19 @@ int taosReadn(int fd, char *ptr, int nbytes) { ...@@ -261,19 +261,19 @@ int taosReadn(int fd, char *ptr, int nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int taosOpenUdpSocket(char *ip, short port) { int taosOpenUdpSocket(char *ip, uint16_t port) {
struct sockaddr_in localAddr; struct sockaddr_in localAddr;
int sockFd; int sockFd;
int ttl = 128; int ttl = 128;
int reuse, nocheck; int reuse, nocheck;
int bufSize = 8192000; int bufSize = 8192000;
pTrace("open udp socket:%s:%d", ip, port); pTrace("open udp socket:%s:%hu", ip, port);
memset((char *)&localAddr, 0, sizeof(localAddr)); memset((char *)&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET; localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = inet_addr(ip); localAddr.sin_addr.s_addr = inet_addr(ip);
localAddr.sin_port = (uint16_t)htons((uint16_t)port); localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) < 0) { if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
pError("failed to open udp socket: %d (%s)", errno, strerror(errno)); pError("failed to open udp socket: %d (%s)", errno, strerror(errno));
...@@ -319,7 +319,7 @@ int taosOpenUdpSocket(char *ip, short port) { ...@@ -319,7 +319,7 @@ int taosOpenUdpSocket(char *ip, short port) {
/* bind socket to local address */ /* bind socket to local address */
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
pError("failed to bind udp socket: %d (%s), %s:%d", errno, strerror(errno), ip, port); pError("failed to bind udp socket: %d (%s), %s:%hu", errno, strerror(errno), ip, port);
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
return -1; return -1;
} }
...@@ -327,7 +327,7 @@ int taosOpenUdpSocket(char *ip, short port) { ...@@ -327,7 +327,7 @@ int taosOpenUdpSocket(char *ip, short port) {
return sockFd; return sockFd;
} }
int taosOpenTcpClientSocket(char *destIp, short destPort, char *clientIp) { int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
int sockFd = 0; int sockFd = 0;
struct sockaddr_in serverAddr, clientAddr; struct sockaddr_in serverAddr, clientAddr;
int ret; int ret;
...@@ -364,7 +364,7 @@ int taosOpenTcpClientSocket(char *destIp, short destPort, char *clientIp) { ...@@ -364,7 +364,7 @@ int taosOpenTcpClientSocket(char *destIp, short destPort, char *clientIp) {
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) { if (ret != 0) {
pError("failed to connect socket, ip:%s, port:%d, reason: %s", destIp, destPort, strerror(errno)); pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
sockFd = -1; sockFd = -1;
} }
...@@ -422,17 +422,17 @@ int taosKeepTcpAlive(int sockFd) { ...@@ -422,17 +422,17 @@ int taosKeepTcpAlive(int sockFd) {
return 0; return 0;
} }
int taosOpenTcpServerSocket(char *ip, short port) { int taosOpenTcpServerSocket(char *ip, uint16_t port) {
struct sockaddr_in serverAdd; struct sockaddr_in serverAdd;
int sockFd; int sockFd;
int reuse; int reuse;
pTrace("open tcp server socket:%s:%d", ip, port); pTrace("open tcp server socket:%s:%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd)); bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET; serverAdd.sin_family = AF_INET;
serverAdd.sin_addr.s_addr = inet_addr(ip); serverAdd.sin_addr.s_addr = inet_addr(ip);
serverAdd.sin_port = (uint16_t)htons((uint16_t)port); serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
pError("failed to open TCP socket: %d (%s)", errno, strerror(errno)); pError("failed to open TCP socket: %d (%s)", errno, strerror(errno));
...@@ -449,7 +449,7 @@ int taosOpenTcpServerSocket(char *ip, short port) { ...@@ -449,7 +449,7 @@ int taosOpenTcpServerSocket(char *ip, short port) {
/* bind socket to server address */ /* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
pError("bind tcp server socket failed, %s:%d, reason:%d(%s)", ip, port, errno, strerror(errno)); pError("bind tcp server socket failed, %s:%hu, reason:%d(%s)", ip, port, errno, strerror(errno));
close(sockFd); close(sockFd);
return -1; return -1;
} }
...@@ -457,7 +457,7 @@ int taosOpenTcpServerSocket(char *ip, short port) { ...@@ -457,7 +457,7 @@ int taosOpenTcpServerSocket(char *ip, short port) {
if (taosKeepTcpAlive(sockFd) < 0) return -1; if (taosKeepTcpAlive(sockFd) < 0) return -1;
if (listen(sockFd, 10) < 0) { if (listen(sockFd, 10) < 0) {
pError("listen tcp server socket failed, %s:%d, reason:%d(%s)", ip, port, errno, strerror(errno)); pError("listen tcp server socket failed, %s:%hu, reason:%d(%s)", ip, port, errno, strerror(errno));
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册