提交 afdb0242 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into fix/TD-18487

...@@ -26,5 +26,3 @@ TDengine 集群中的时序数据的副本数是与数据库关联的,一个 ...@@ -26,5 +26,3 @@ TDengine 集群中的时序数据的副本数是与数据库关联的,一个
TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。 TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。
当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。
另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX)
...@@ -33,7 +33,7 @@ extern bool gRaftDetailLog; ...@@ -33,7 +33,7 @@ extern bool gRaftDetailLog;
#define SYNC_MAX_READ_RANGE 2 #define SYNC_MAX_READ_RANGE 2
#define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1000 #define SYNC_MAX_RECV_TIME_RANGE_MS 1200
#define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
......
...@@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100; ...@@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false; bool tsMonitorComp = false;
// telem // telem
bool tsEnableTelem = false; bool tsEnableTelem = true;
int32_t tsTelemInterval = 86400; int32_t tsTelemInterval = 86400;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
...@@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400; ...@@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
memset(tsDataDir, 0, PATH_MAX); memset(tsDataDir, 0, PATH_MAX);
...@@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { ...@@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
uError("failed to create dataDir:%s", tsDataDir); uError("failed to create dataDir:%s", tsDataDir);
return -1; return -1;
} }
return 0; return 0;
} }
#else #else
int32_t taosSetTfsCfg(SConfig *pCfg); int32_t taosSetTfsCfg(SConfig *pCfg);
......
...@@ -3970,16 +3970,16 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { ...@@ -3970,16 +3970,16 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0);
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) { if (pCtx->start.key == INT64_MIN) {
pInfo->max = pInfo->max = (pInfo->max < ptsList[start]) ? ptsList[start] : pInfo->max;
(pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else { } else {
pInfo->max = pCtx->start.key + 1; pInfo->max = pCtx->start.key + 1;
} }
if (pCtx->end.key != INT64_MIN) { if (pCtx->end.key == INT64_MIN) {
pInfo->min = pCtx->end.key; pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ?
ptsList[start + pInput->numOfRows - 1] : pInfo->min;
} else { } else {
pInfo->min = ptsList[start]; pInfo->min = pCtx->end.key;
} }
} else { } else {
if (pCtx->start.key == INT64_MIN) { if (pCtx->start.key == INT64_MIN) {
...@@ -3988,10 +3988,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { ...@@ -3988,10 +3988,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
pInfo->min = pCtx->start.key; pInfo->min = pCtx->start.key;
} }
if (pCtx->end.key != INT64_MIN) { if (pCtx->end.key == INT64_MIN) {
pInfo->max = pCtx->end.key + 1; pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ?
ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else { } else {
pInfo->max = ptsList[start + pInput->numOfRows - 1]; pInfo->max = pCtx->end.key + 1;
} }
} }
} }
......
...@@ -98,6 +98,11 @@ typedef void* queue[2]; ...@@ -98,6 +98,11 @@ typedef void* queue[2];
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx; typedef SRpcCtx STransCtx;
...@@ -151,6 +156,7 @@ typedef struct { ...@@ -151,6 +156,7 @@ typedef struct {
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
char user[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN];
uint32_t magicNum;
STraceId traceId; STraceId traceId;
uint64_t ahandle; // ahandle assigned by client uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later uint32_t code; // del later
...@@ -203,6 +209,7 @@ typedef struct SConnBuffer { ...@@ -203,6 +209,7 @@ typedef struct SConnBuffer {
int cap; int cap;
int left; int left;
int total; int total;
int invalid;
} SConnBuffer; } SConnBuffer;
typedef void (*AsyncCB)(uv_async_t* handle); typedef void (*AsyncCB)(uv_async_t* handle);
......
...@@ -14,15 +14,22 @@ ...@@ -14,15 +14,22 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#ifdef USE_UV
#include <uv.h>
#endif
// clang-format off // clang-format off
#include <uv.h>
#include "zlib.h" #include "zlib.h"
#include "thttp.h" #include "thttp.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
typedef struct SHttpClient {
uv_connect_t conn;
uv_tcp_t tcp;
uv_write_t req;
uv_buf_t* buf;
char* addr;
uint16_t port;
} SHttpClient;
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) { EHttpCompFlag flag) {
if (flag == HTTP_FLAT) { if (flag == HTTP_FLAT) {
...@@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH ...@@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH
} }
} }
int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
int32_t code = -1; int32_t code = -1;
int32_t destLen = srcLen; int32_t destLen = srcLen;
void* pDest = taosMemoryMalloc(destLen); void* pDest = taosMemoryMalloc(destLen);
...@@ -114,84 +121,53 @@ _OVER: ...@@ -114,84 +121,53 @@ _OVER:
return code; return code;
} }
#ifdef USE_UV static void destroyHttpClient(SHttpClient* cli) {
taosMemoryFree(cli->buf);
taosMemoryFree(cli->addr);
taosMemoryFree(cli);
}
static void clientCloseCb(uv_handle_t* handle) {
SHttpClient* cli = handle->data;
destroyHttpClient(cli);
}
static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to send data %s", uv_strerror(status));
} else {
uInfo("http-report succ to send data");
}
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
static void clientConnCb(uv_connect_t* req, int32_t status) { static void clientConnCb(uv_connect_t* req, int32_t status) {
if (status < 0) { SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status); terrno = TAOS_SYSTEM_ERROR(status);
uError("connection error %s", uv_strerror(status)); uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
uv_close((uv_handle_t*)req->handle, NULL); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
return; return;
} }
uv_buf_t* wb = req->data; uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->buf, 2, clientSentCb);
assert(wb != NULL);
uv_write_t write_req;
uv_write(&write_req, req->handle, wb, 2, NULL);
uv_close((uv_handle_t*)req->handle, NULL);
} }
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
uint32_t ipv4 = taosGetIpv4FromFqdn(server); uint32_t ip = taosGetIpv4FromFqdn(server);
if (ipv4 == 0xffffffff) { if (ip == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr()); uError("http-report failed to get http server:%s ip since %s", server, terrstr());
return -1; return -1;
} }
char buf[128] = {0};
char ipv4Buf[128] = {0}; tinet_ntoa(buf, ip);
tinet_ntoa(ipv4Buf, ipv4); uv_ip4_addr(buf, port, dest);
return 0;
struct sockaddr_in dest = {0};
uv_ip4_addr(ipv4Buf, port, &dest);
uv_tcp_t socket_tcp = {0};
uv_loop_t* loop = uv_default_loop();
uv_tcp_init(loop, &socket_tcp);
uv_connect_t* connect = (uv_connect_t*)taosMemoryMalloc(sizeof(uv_connect_t));
if (flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(pCont, contLen);
if (dstLen > 0) {
contLen = dstLen;
} else {
flag = HTTP_FLAT;
}
}
char header[1024] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
uv_buf_t wb[2];
wb[0] = uv_buf_init((char*)header, headLen);
wb[1] = uv_buf_init((char*)pCont, contLen);
connect->data = wb;
terrno = 0;
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
taosMemoryFree(connect);
return terrno;
} }
#else
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t code = -1; struct sockaddr_in dest = {0};
TdSocketPtr pSocket = NULL; if (taosBuildDstAddr(server, port, &dest) < 0) {
return -1;
uint32_t ip = taosGetIpv4FromFqdn(server);
if (ip == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr());
goto SEND_OVER;
}
pSocket = taosOpenTcpClientSocket(ip, port, 0);
if (pSocket == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create http socket to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
} }
if (flag == HTTP_GZIP) { if (flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(pCont, contLen); int32_t dstLen = taosCompressHttpRport(pCont, contLen);
if (dstLen > 0) { if (dstLen > 0) {
...@@ -200,37 +176,38 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 ...@@ -200,37 +176,38 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
flag = HTTP_FLAT; flag = HTTP_FLAT;
} }
} }
terrno = 0;
char header[1024] = {0}; char header[2048] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
if (taosWriteMsg(pSocket, header, headLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http header to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) { uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
terrno = TAOS_SYSTEM_ERROR(errno); wb[0] = uv_buf_init((char*)header, headLen); // stack var
uError("failed to send http content to %s:%u since %s", server, port, terrstr()); wb[1] = uv_buf_init((char*)pCont, contLen); // heap var
goto SEND_OVER;
}
// read something to avoid nginx error 499 SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
if (taosWriteMsg(pSocket, header, 10) < 0) { cli->conn.data = cli;
terrno = TAOS_SYSTEM_ERROR(errno); cli->tcp.data = cli;
uError("failed to receive response from %s:%u since %s", server, port, terrstr()); cli->req.data = cli;
goto SEND_OVER; cli->buf = wb;
} cli->addr = tstrdup(server);
cli->port = port;
uv_loop_t* loop = uv_default_loop();
uv_tcp_init(loop, &cli->tcp);
// set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5);
uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
code = 0;
SEND_OVER: int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb);
if (pSocket != NULL) { if (ret != 0) {
taosCloseSocket(&pSocket); uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
destroyHttpClient(cli);
} }
return code; uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
return terrno;
} }
// clang-format on // clang-format on
#endif
...@@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) { ...@@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) {
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
......
...@@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { ...@@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) {
buf->left = -1; buf->left = -1;
buf->len = 0; buf->len = 0;
buf->total = 0; buf->total = 0;
buf->invalid = 0;
return 0; return 0;
} }
int transDestroyBuffer(SConnBuffer* p) { int transDestroyBuffer(SConnBuffer* p) {
...@@ -108,19 +109,25 @@ int transClearBuffer(SConnBuffer* buf) { ...@@ -108,19 +109,25 @@ int transClearBuffer(SConnBuffer* buf) {
p->left = -1; p->left = -1;
p->len = 0; p->len = 0;
p->total = 0; p->total = 0;
p->invalid = 0;
return 0; return 0;
} }
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
static const int HEADSIZE = sizeof(STransMsgHead);
SConnBuffer* p = connBuf; SConnBuffer* p = connBuf;
if (p->left != 0) { if (p->left != 0) {
return -1; return -1;
} }
int total = connBuf->total; int total = connBuf->total;
*buf = taosMemoryCalloc(1, total); if (total >= HEADSIZE && !p->invalid) {
memcpy(*buf, p->buf, total); *buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
transResetBuffer(connBuf); transResetBuffer(connBuf);
} else {
total = -1;
}
return total; return total;
} }
...@@ -173,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) { ...@@ -173,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
memcpy((char*)&head, connBuf->buf, sizeof(head)); memcpy((char*)&head, connBuf->buf, sizeof(head));
int32_t msgLen = (int32_t)htonl(head.msgLen); int32_t msgLen = (int32_t)htonl(head.msgLen);
p->total = msgLen; p->total = msgLen;
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
} }
if (p->total >= p->len) { if (p->total >= p->len) {
p->left = p->total - p->len; p->left = p->total - p->len;
...@@ -180,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) { ...@@ -180,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) {
p->left = 0; p->left = 0;
} }
} }
return p->left == 0 ? true : false;
return (p->left == 0 || p->invalid) ? true : false;
} }
int transSetConnOption(uv_tcp_t* stream) { int transSetConnOption(uv_tcp_t* stream) {
......
...@@ -183,11 +183,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { ...@@ -183,11 +183,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug("%p timeout since no activity", conn); tDebug("%p timeout since no activity", conn);
} }
static void uvHandleReq(SSvrConn* pConn) { static bool uvHandleReq(SSvrConn* pConn) {
STransMsgHead* msg = NULL; STrans* pTransInst = pConn->pTransInst;
int msgLen = 0;
msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); STransMsgHead* msg = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false;
}
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
...@@ -200,9 +204,8 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -200,9 +204,8 @@ static void uvHandleReq(SSvrConn* pConn) {
// uv_read_stop((uv_stream_t*)pConn->pTcp); // uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn); // transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
if (uvRecvReleaseReq(pConn, pHead)) { if (uvRecvReleaseReq(pConn, pHead)) {
return; return true;
} }
STransMsg transMsg; STransMsg transMsg;
...@@ -220,7 +223,6 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -220,7 +223,6 @@ static void uvHandleReq(SSvrConn* pConn) {
tDebug("conn %p acquired by server app", pConn); tDebug("conn %p acquired by server app", pConn);
} }
} }
STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pHead->traceId; STraceId* trace = &pHead->traceId;
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
...@@ -258,21 +260,31 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -258,21 +260,31 @@ static void uvHandleReq(SSvrConn* pConn) {
transReleaseExHandle(transGetRefMgt(), pConn->refId); transReleaseExHandle(transGetRefMgt(), pConn->refId);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
return true;
} }
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt SSvrConn* conn = cli->data;
SSvrConn* conn = cli->data; STrans* pTransInst = conn->pTransInst;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
STrans* pTransInst = conn->pTransInst;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
while (transReadComplete(pBuf)) { if (pBuf->len <= TRANS_PACKET_LIMIT) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); while (transReadComplete(pBuf)) {
uvHandleReq(conn); tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
if (uvHandleReq(conn) == false) {
destroyConn(conn, true);
return;
}
}
return;
} else {
tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn);
destroyConn(conn, true);
return;
} }
return;
} }
if (nread == 0) { if (nread == 0) {
return; return;
...@@ -364,6 +376,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -364,6 +376,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->ahandle = (uint64_t)pMsg->info.ahandle;
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet; pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册