提交 8aa61269 编写于 作者: S Shengliang Guan

Merge from develop into feature/os

...@@ -1999,8 +1999,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { ...@@ -1999,8 +1999,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
return 0; return 0;
} }
int tscProcessDropDbRsp(SSqlObj *pSql) { int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
pSql->pTscObj->db[0] = 0;
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscCacheHandle);
return 0; return 0;
} }
......
...@@ -140,7 +140,7 @@ char tsMqttBrokerAddress[128] = {0}; ...@@ -140,7 +140,7 @@ char tsMqttBrokerAddress[128] = {0};
char tsMqttBrokerClientId[128] = {0}; char tsMqttBrokerClientId[128] = {0};
// monitor // monitor
int32_t tsEnableMonitorModule = 0; int32_t tsEnableMonitorModule = 1;
char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
char tsInternalPass[] = "secretkey"; char tsInternalPass[] = "secretkey";
int32_t tsMonitorInterval = 30; // seconds int32_t tsMonitorInterval = 30; // seconds
......
...@@ -42,7 +42,8 @@ public class AsyncSubscribeTest extends BaseTest { ...@@ -42,7 +42,8 @@ public class AsyncSubscribeTest extends BaseTest {
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
ts += i; ts += i;
statement.executeUpdate("insert into \" + dbName + \".\" + tName + \" values (" + ts + ", " + (100 + i) + ", " + i + ")"); String sql = "insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")";
statement.executeUpdate(sql);
} }
} }
......
...@@ -71,7 +71,7 @@ public class ResultSetTest extends BaseTest { ...@@ -71,7 +71,7 @@ public class ResultSetTest extends BaseTest {
} }
try { try {
statement.executeQuery("select * from " + dbName + "." + tName); statement.executeQuery("select * from " + dbName + "." + tName + " where ts = " + ts);
resSet = statement.getResultSet(); resSet = statement.getResultSet();
System.out.println(((TSDBResultSet) resSet).getRowData()); System.out.println(((TSDBResultSet) resSet).getRowData());
while (resSet.next()) { while (resSet.next()) {
...@@ -806,9 +806,9 @@ public class ResultSetTest extends BaseTest { ...@@ -806,9 +806,9 @@ public class ResultSetTest extends BaseTest {
@Test @Test
public void testBatch() throws SQLException { public void testBatch() throws SQLException {
String[] sqls = new String[]{"insert into test.t0 values (1496732686001,2147483600,1496732687000,3.1415925,3.1415926\n" + String[] sqls = new String[]{"insert into test.t0 values (1496732686001,2147483600,1496732687000,3.1415925,3.1415926535897," +
"535897,\"涛思数据,强~!\",12,12,\"TDengine is powerful\")", "insert into test.t0 values (1496732686002,2147483600,1496732687000,3.1415925,3.1415926\n" + "'涛思数据,强~',12,0,'TDengine is powerful')", "insert into test.t0 values (1496732686002,2147483600,1496732687000,3.1415925,3.1415926535897," +
"535897,\"涛思数据,强~!\",12,12,\"TDengine is powerful\")"}; "'涛思数据,强~',12,1,'TDengine is powerful')"};
for (String sql : sqls) { for (String sql : sqls) {
statement.addBatch(sql); statement.addBatch(sql);
} }
...@@ -816,7 +816,6 @@ public class ResultSetTest extends BaseTest { ...@@ -816,7 +816,6 @@ public class ResultSetTest extends BaseTest {
assertEquals(res.length, 2); assertEquals(res.length, 2);
statement.clearBatch(); statement.clearBatch();
} }
@AfterClass @AfterClass
public static void close() throws Exception { public static void close() throws Exception {
statement.executeUpdate("drop database " + dbName); statement.executeUpdate("drop database " + dbName);
......
...@@ -41,7 +41,8 @@ public class SubscribeTest extends BaseTest { ...@@ -41,7 +41,8 @@ public class SubscribeTest extends BaseTest {
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
ts += i; ts += i;
statement.executeUpdate("insert into \" + dbName + \".\" + tName + \" values (" + ts + ", " + (100 + i) + ", " + i + ")"); String sql = "insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")";
statement.executeUpdate(sql);
} }
} }
......
...@@ -2,4 +2,6 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) ...@@ -2,4 +2,6 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
ADD_SUBDIRECTORY(shell) ADD_SUBDIRECTORY(shell)
ADD_SUBDIRECTORY(taosdemo) ADD_SUBDIRECTORY(taosdemo)
\ No newline at end of file #ADD_SUBDIRECTORY(taosClusterTest)
ADD_SUBDIRECTORY(taosnetwork)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taosClient client.c)
ADD_EXECUTABLE(taosServer server.c)
TARGET_LINK_LIBRARIES( taosServer -lpthread -lm -lrt )
TARGET_LINK_LIBRARIES( taosClient -lpthread -lm -lrt )
ENDIF ()
...@@ -28,23 +28,27 @@ ...@@ -28,23 +28,27 @@
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#define BUFFER_SIZE 200 #define MAX_PKG_LEN (64*1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
typedef struct { typedef struct {
int port; int port;
char *host[15]; char *host;
} info; uint16_t pktLen;
} info_s;
typedef struct Arguments { typedef struct Arguments {
char * host; char * host;
uint16_t port; uint16_t port;
uint16_t max_port; uint16_t max_port;
uint16_t pktLen;
} SArguments; } SArguments;
static struct argp_option options[] = { static struct argp_option options[] = {
{0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0},
{0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1}, {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6030.", 1},
{0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}}; {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2},
{0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) { static error_t parse_opt(int key, char *arg, struct argp_state *state) {
...@@ -59,16 +63,21 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -59,16 +63,21 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'm': case 'm':
arguments->max_port = atoi(arg); arguments->max_port = atoi(arg);
break; break;
case 'l':
arguments->pktLen = atoi(arg);
break;
default:
return ARGP_ERR_UNKNOWN;
} }
return 0; return 0;
} }
static struct argp argp = {options, parse_opt, 0, 0}; static struct argp argp = {options, parse_opt, 0, 0};
void *checkPort(void *sarg) { int checkTcpPort(info_s *info) {
info *pinfo = (info *)sarg; int port = info->port;
int port = pinfo->port; char *host = info->host;
char *host = *pinfo->host;
int clientSocket; int clientSocket;
struct sockaddr_in serverAddr; struct sockaddr_in serverAddr;
...@@ -76,39 +85,65 @@ void *checkPort(void *sarg) { ...@@ -76,39 +85,65 @@ void *checkPort(void *sarg) {
char recvbuf[BUFFER_SIZE]; char recvbuf[BUFFER_SIZE];
int iDataNum; int iDataNum;
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket"); printf("socket() fail: %s\n", strerror(errno));
return NULL; return -1;
} }
serverAddr.sin_family = AF_INET; serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port); serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(host); serverAddr.sin_addr.s_addr = inet_addr(host);
printf("=================================\n"); //printf("=================================\n");
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
perror("connect"); printf("connect() fail: %s\n", strerror(errno));
return NULL; return -1;
}
//printf("Connect to: %s:%d...success\n", host, port);
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", host, port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
send(clientSocket, sendbuf, info->pktLen, 0);
memset(recvbuf, 0, BUFFER_SIZE);
int nleft, nread;
char *ptr = recvbuf;
nleft = info->pktLen;
while (nleft > 0) {
nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);;
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
printf("recv ack pkg from TCP port: %d fail:%s.\n", port, strerror(errno));
close(clientSocket);
return -1;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
} }
printf("Connect to: %s:%d...success\n", host, port);
sprintf(sendbuf, "send port_%d", port);
send(clientSocket, sendbuf, strlen(sendbuf), 0);
printf("Send msg_%d: %s\n", port, sendbuf);
recvbuf[0] = '\0'; if (iDataNum < info->pktLen) {
iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0); printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, port);
recvbuf[iDataNum] = '\0'; return -1;
printf("Read ack msg_%d: %s\n", port, recvbuf); }
//printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8);
printf("=================================\n");
close(clientSocket); close(clientSocket);
return NULL; return 0;
} }
void *checkUPort(void *sarg) { int checkUdpPort(info_s *info) {
info *pinfo = (info *)sarg; int port = info->port;
int port = pinfo->port; char *host = info->host;
char *host = *pinfo->host;
int clientSocket; int clientSocket;
struct sockaddr_in serverAddr; struct sockaddr_in serverAddr;
...@@ -117,56 +152,76 @@ void *checkUPort(void *sarg) { ...@@ -117,56 +152,76 @@ void *checkUPort(void *sarg) {
int iDataNum; int iDataNum;
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
perror("socket"); perror("socket");
return NULL; return -1;
} }
serverAddr.sin_family = AF_INET; serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port); serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(host); serverAddr.sin_addr.s_addr = inet_addr(host);
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
printf("=================================\n"); sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", host, port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
sprintf(sendbuf, "send msg port_%d by udp", port);
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size); int code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int)sin_size);
if (code < 0) {
printf("Send msg_%d by udp: %s\n", port, sendbuf); perror("sendto");
return -1;
}
recvbuf[0] = '\0';
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
recvbuf[iDataNum] = '\0';
printf("Read ack msg_%d from udp: %s\n", port, recvbuf);
printf("=================================\n"); if (iDataNum < info->pktLen) {
printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\n", iDataNum, info->pktLen, port);
return -1;
}
//printf("Read ack pkg len:%d from udp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8);
close(clientSocket); close(clientSocket);
return NULL; return 0;
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SArguments arguments = {"127.0.0.1", 6041, 6050}; SArguments arguments = {"127.0.0.1", 6030, 6060, 1000};
info_s info;
int ret;
argp_parse(&argp, argc, argv, 0, 0, &arguments); argp_parse(&argp, argc, argv, 0, 0, &arguments);
if (arguments.pktLen > MAX_PKG_LEN) {
printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN);
exit(0);
}
printf("host: %s\tport: %d\tmax_port: %d\n", arguments.host, arguments.port, arguments.max_port); printf("host: %s\tport: %d\tmax_port: %d\tpkgLen: %d\n", arguments.host, arguments.port, arguments.max_port, arguments.pktLen);
int port = arguments.port; int port = arguments.port;
char *host = arguments.host;
info *tinfo = malloc(sizeof(info));
info *uinfo = malloc(sizeof(info));
for (; port < arguments.max_port; port++) {
printf("For test: %s:%d\n", host, port);
*tinfo->host = host;
tinfo->port = port;
checkPort(tinfo);
*uinfo->host = host; info.host = arguments.host;
uinfo->port = port; info.pktLen = arguments.pktLen;
checkUPort(uinfo);
for (; port <= arguments.max_port; port++) {
//printf("test: %s:%d\n", info.host, port);
printf("\n");
info.port = port;
ret = checkTcpPort(&info);
if (ret != 0) {
printf("tcp port:%d test fail.\t\t", port);
} else {
printf("tcp port:%d test ok.\t\t", port);
}
ret = checkUdpPort(&info);
if (ret != 0) {
printf("udp port:%d test fail.\t\t", port);
} else {
printf("udp port:%d test ok.\t\t", port);
}
} }
free(tinfo); printf("\n");
free(uinfo); return 0;
} }
\ No newline at end of file
...@@ -27,24 +27,28 @@ ...@@ -27,24 +27,28 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <pthread.h>
#define BUFFER_SIZE 200 #define MAX_PKG_LEN (64*1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
typedef struct { typedef struct {
int port; int port;
int type; // 0: tcp, 1: udo, default: 0 uint16_t pktLen;
} info; } info_s;
typedef struct Arguments { typedef struct Arguments {
char * host; char * host;
uint16_t port; uint16_t port;
uint16_t max_port; uint16_t max_port;
uint16_t pktLen;
} SArguments; } SArguments;
static struct argp_option options[] = { static struct argp_option options[] = {
{0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0},
{0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6020.", 1}, {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1},
{0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}}; {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2},
{0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) { static error_t parse_opt(int key, char *arg, struct argp_state *state) {
...@@ -59,16 +63,21 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -59,16 +63,21 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'm': case 'm':
arguments->max_port = atoi(arg); arguments->max_port = atoi(arg);
break; break;
case 'l':
arguments->pktLen = atoi(arg);
break;
default:
return ARGP_ERR_UNKNOWN;
} }
return 0; return 0;
} }
static struct argp argp = {options, parse_opt, 0, 0}; static struct argp argp = {options, parse_opt, 0, 0};
static void *bindPort(void *sarg) { static void *bindTcpPort(void *sarg) {
info *pinfo = (info *)sarg; info_s *pinfo = (info_s *)sarg;
int port = pinfo->port; int port = pinfo->port;
int type = pinfo->type;
int serverSocket; int serverSocket;
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
...@@ -76,10 +85,10 @@ static void *bindPort(void *sarg) { ...@@ -76,10 +85,10 @@ static void *bindPort(void *sarg) {
int addr_len = sizeof(clientAddr); int addr_len = sizeof(clientAddr);
int client; int client;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
int iDataNum; int iDataNum = 0;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
perror("socket"); printf("socket() fail: %s", strerror(errno));
return NULL; return NULL;
} }
...@@ -89,65 +98,67 @@ static void *bindPort(void *sarg) { ...@@ -89,65 +98,67 @@ static void *bindPort(void *sarg) {
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("connect"); printf("port:%d bind() fail: %s", port, strerror(errno));
return NULL; return NULL;
} }
if (listen(serverSocket, 5) < 0) { if (listen(serverSocket, 5) < 0) {
perror("listen"); printf("listen() fail: %s", strerror(errno));
return NULL; return NULL;
} }
printf("Bind port: %d success\n", port); //printf("Bind port: %d success\n", port);
while (1) { while (1) {
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) { if (client < 0) {
perror("accept"); printf("accept() fail: %s", strerror(errno));
continue; continue;
} }
printf("=================================\n");
memset(buffer, 0, BUFFER_SIZE);
printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port); int nleft, nread;
while (1) { char *ptr = buffer;
buffer[0] = '\0'; nleft = pinfo->pktLen;
iDataNum = recv(client, buffer, BUFFER_SIZE, 0); while (nleft > 0) {
nread = recv(client, ptr, BUFFER_SIZE, 0);
if (iDataNum < 0) {
perror("recv null"); if (nread == 0) {
continue;
}
if (iDataNum > 0) {
buffer[iDataNum] = '\0';
printf("read msg:%s\n", buffer);
if (strcmp(buffer, "quit") == 0) break;
buffer[0] = '\0';
sprintf(buffer, "ack port_%d", port);
printf("send ack msg:%s\n", buffer);
send(client, buffer, strlen(buffer), 0);
break; break;
} } else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
printf("recv Client: %s pkg from TCP port: %d fail:%s.\n", inet_ntoa(clientAddr.sin_addr), port, strerror(errno));
close(serverSocket);
return NULL;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
}
printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum);
if (iDataNum > 0) {
send(client, buffer, iDataNum, 0);
break;
} }
printf("=================================\n");
} }
close(serverSocket); close(serverSocket);
return NULL; return NULL;
} }
static void *bindUPort(void *sarg) { static void *bindUdpPort(void *sarg) {
info *pinfo = (info *)sarg; info_s *pinfo = (info_s *)sarg;
int port = pinfo->port; int port = pinfo->port;
int type = pinfo->type;
int serverSocket; int serverSocket;
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
int addr_len = sizeof(clientAddr);
int client;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
int iDataNum; int iDataNum;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
perror("socket"); perror("socket");
return NULL; return NULL;
...@@ -164,10 +175,9 @@ static void *bindUPort(void *sarg) { ...@@ -164,10 +175,9 @@ static void *bindUPort(void *sarg) {
} }
socklen_t sin_size; socklen_t sin_size;
printf("Bind port: %d success\n", port);
while (1) { while (1) {
buffer[0] = '\0'; memset(buffer, 0, BUFFER_SIZE);
sin_size = sizeof(*(struct sockaddr *)&server_addr); sin_size = sizeof(*(struct sockaddr *)&server_addr);
...@@ -178,21 +188,10 @@ static void *bindUPort(void *sarg) { ...@@ -178,21 +188,10 @@ static void *bindUPort(void *sarg) {
continue; continue;
} }
if (iDataNum > 0) { if (iDataNum > 0) {
printf("=================================\n"); printf("recv Client: %s pkg from UDP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum);
//printf("Read msg from udp:%s ... %s\n", buffer, buffer+iDataNum-16);
printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
buffer[iDataNum] = '\0';
printf("Read msg from udp:%s\n", buffer);
if (strcmp(buffer, "quit") == 0) break;
buffer[0] = '\0';
sprintf(buffer, "ack port_%d by udp", port); sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int)sin_size);
printf("Send ack msg by udp:%s\n", buffer);
sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size);
send(client, buffer, strlen(buffer), 0);
printf("=================================\n");
} }
} }
...@@ -202,39 +201,44 @@ static void *bindUPort(void *sarg) { ...@@ -202,39 +201,44 @@ static void *bindUPort(void *sarg) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SArguments arguments = {"127.0.0.1", 6020, 6050}; SArguments arguments = {"127.0.0.1", 6030, 6060, 1000};
argp_parse(&argp, argc, argv, 0, 0, &arguments); argp_parse(&argp, argc, argv, 0, 0, &arguments);
if (arguments.pktLen > MAX_PKG_LEN) {
printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN);
exit(0);
}
int port = arguments.port; int port = arguments.port;
int num = arguments.max_port - arguments.port; int num = arguments.max_port - arguments.port + 1;
if (num < 0) { if (num < 0) {
num = 1; num = 1;
} }
pthread_t *pids = malloc(2 * num * sizeof(pthread_t)); pthread_t *pids = malloc(2 * num * sizeof(pthread_t));
info * infos = malloc(num * sizeof(info)); info_s * tinfos = malloc(num * sizeof(info_s));
info * uinfos = malloc(num * sizeof(info)); info_s * uinfos = malloc(num * sizeof(info_s));
for (size_t i = 0; i < num; i++) { for (size_t i = 0; i < num; i++) {
info *pinfo = infos++; info_s *tcpInfo = tinfos + i;
pinfo->port = port; tcpInfo->port = port + i;
tcpInfo->pktLen = arguments.pktLen;
if (pthread_create(pids + i, NULL, bindPort, pinfo) != 0) //创建线程
{ //创建线程失败 if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0)
printf("创建线程失败: %d.\n", port); {
exit(0); printf("create thread fail, port:%d.\n", port);
exit(-1);
} }
info *uinfo = uinfos++; info_s *udpInfo = uinfos + i;
uinfo->port = port; udpInfo->port = port + i;
uinfo->type = 1; if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0)
port++; {
if (pthread_create(pids + num + i, NULL, bindUPort, uinfo) != 0) //创建线程 printf("create thread fail, port:%d.\n", port);
{ //创建线程失败 exit(-1);
printf("创建线程失败: %d.\n", port);
exit(0);
} }
} }
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
pthread_join(pids[(num + i)], NULL); pthread_join(pids[(num + i)], NULL);
......
...@@ -75,6 +75,11 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -75,6 +75,11 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
} }
void mnodeProcessPeerRsp(SRpcMsg *pMsg) { void mnodeProcessPeerRsp(SRpcMsg *pMsg) {
if (!sdbIsMaster()) {
mError("%p, msg:%s is not processed for it is not master", pMsg->ahandle, taosMsg[pMsg->msgType]);
return;
}
if (tsMnodeProcessPeerRspFp[pMsg->msgType]) { if (tsMnodeProcessPeerRspFp[pMsg->msgType]) {
(*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg); (*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg);
} else { } else {
......
...@@ -164,9 +164,9 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) { ...@@ -164,9 +164,9 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
// reset vgid status on vgroup changed // reset vgid status on vgroup changed
mDebug("vgId:%d, reset sync status to unsynced", pVgroup->vgId); mDebug("vgId:%d, reset sync status to offline", pVgroup->vgId);
for (int32_t v = 0; v < pVgroup->numOfVnodes; ++v) { for (int32_t v = 0; v < pVgroup->numOfVnodes; ++v) {
pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_UNSYNCED; pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_OFFLINE;
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册