提交 46a340d0 编写于 作者: H Haojun Liao

[td-225] fix tcp transfer crash.

上级 401c31a6
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#define RPC_MAX_UDP_SIZE 65480 #define RPC_MAX_UDP_SIZE 65480
int tsUdpDelay = 0; int tsUdpDelay = 0;
uint32_t tsPkgSig = 0; // package signature, it is a random number to make sure start the sig will be different
typedef struct { typedef struct {
void * signature; void * signature;
...@@ -80,6 +81,7 @@ typedef struct { ...@@ -80,6 +81,7 @@ typedef struct {
uint64_t handle; uint64_t handle;
uint16_t port; uint16_t port;
int32_t msgLen; int32_t msgLen;
uint32_t sig;
} SPacketInfo; } SPacketInfo;
typedef struct { typedef struct {
...@@ -100,15 +102,17 @@ typedef struct { ...@@ -100,15 +102,17 @@ typedef struct {
} SMonitor; } SMonitor;
typedef struct { typedef struct {
uint32_t sig;
uint64_t handle; uint64_t handle;
uint64_t hash; uint64_t hash;
} SHandleViaTcp; } SHandleViaTcp;
bool taosCheckHandleViaTcpValid(SHandleViaTcp *handleViaTcp) { bool taosCheckHandleViaTcpValid(SHandleViaTcp *handleViaTcp) {
return handleViaTcp->hash == taosHashUInt64(handleViaTcp->handle); return handleViaTcp->hash == taosHashUInt64(handleViaTcp->handle) && (handleViaTcp->sig == tsPkgSig);
} }
void taosInitHandleViaTcp(SHandleViaTcp *handleViaTcp, uint64_t handle) { void taosInitHandleViaTcp(SHandleViaTcp *handleViaTcp, uint64_t handle, uint32_t sig) {
handleViaTcp->sig = sig;
handleViaTcp->handle = handle; handleViaTcp->handle = handle;
handleViaTcp->hash = taosHashUInt64(handleViaTcp->handle); handleViaTcp->hash = taosHashUInt64(handleViaTcp->handle);
} }
...@@ -155,7 +159,7 @@ void *taosReadTcpData(void *argv) { ...@@ -155,7 +159,7 @@ void *taosReadTcpData(void *argv) {
} }
SHandleViaTcp handleViaTcp; SHandleViaTcp handleViaTcp;
taosInitHandleViaTcp(&handleViaTcp, pInfo->handle); taosInitHandleViaTcp(&handleViaTcp, pInfo->handle, pInfo->sig);
retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp)); retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp));
if (retLen != (int)sizeof(SHandleViaTcp)) { if (retLen != (int)sizeof(SHandleViaTcp)) {
...@@ -537,6 +541,9 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -537,6 +541,9 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
} }
void *taosInitUdpServer(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpServer(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
srand(time(NULL));
tsPkgSig = rand();
SUdpConnSet *pSet; SUdpConnSet *pSet;
pSet = taosInitUdpConnection(ip, port, label, threads, fp, shandle); pSet = taosInitUdpConnection(ip, port, label, threads, fp, shandle);
if (pSet == NULL) return NULL; if (pSet == NULL) return NULL;
...@@ -685,6 +692,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo ...@@ -685,6 +692,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo
pInfo->handle = (uint64_t)data; pInfo->handle = (uint64_t)data;
pInfo->port = pSet->port; pInfo->port = pSet->port;
pInfo->msgLen = pHead->msgLen; pInfo->msgLen = pHead->msgLen;
pInfo->sig = tsPkgSig;
msgLen = sizeof(STaosHeader) + sizeof(SPacketInfo); msgLen = sizeof(STaosHeader) + sizeof(SPacketInfo);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
...@@ -717,7 +725,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo ...@@ -717,7 +725,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo
tError("%s failed to open TCP socket to:%s:%hu to send packet", pSet->label, ipstr, pConn->port); tError("%s failed to open TCP socket to:%s:%hu to send packet", pSet->label, ipstr, pConn->port);
} else { } else {
SHandleViaTcp handleViaTcp; SHandleViaTcp handleViaTcp;
taosInitHandleViaTcp(&handleViaTcp, 0); taosInitHandleViaTcp(&handleViaTcp, 0, tsPkgSig);
retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp)); retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp));
if (retLen != (int)sizeof(handleViaTcp)) { if (retLen != (int)sizeof(handleViaTcp)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册