From 9d0fcea101f179d73e53aae7ac2b1b480d09dec0 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 1 Aug 2019 11:39:28 +0800 Subject: [PATCH] data transfer via restful interface may be lost --- src/modules/http/inc/httpHandle.h | 13 +++++---- src/modules/http/inc/httpJson.h | 2 +- src/modules/http/src/httpHandle.c | 28 +++++++++--------- src/modules/http/src/httpJson.c | 48 ++++++++++++++++++------------- src/modules/http/src/httpServer.c | 23 ++++++--------- src/os/linux/inc/os.h | 1 + 6 files changed, 60 insertions(+), 55 deletions(-) diff --git a/src/modules/http/inc/httpHandle.h b/src/modules/http/inc/httpHandle.h index 95e64f937c..df9f284966 100644 --- a/src/modules/http/inc/httpHandle.h +++ b/src/modules/http/inc/httpHandle.h @@ -60,11 +60,14 @@ #define HTTP_PROCESS_ERROR 0 #define HTTP_PROCESS_SUCCESS 1 -#define HTTP_PARSE_BODY_ERROR -1 -#define HTTP_PARSE_BODY_CONTINUE 0 -#define HTTP_PARSE_BODY_SUCCESS 1 - -#define HTTP_RETRY_TIMES 2 +#define HTTP_CHECK_BODY_ERROR -1 +#define HTTP_CHECK_BODY_CONTINUE 0 +#define HTTP_CHECK_BODY_SUCCESS 1 + +#define HTTP_READ_RETRY_TIMES 3 +#define HTTP_READ_WAIT_TIME_MS 3 +#define HTTP_WRITE_RETRY_TIMES 100 +#define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_EXPIRED_TIME 60000 struct HttpContext; diff --git a/src/modules/http/inc/httpJson.h b/src/modules/http/inc/httpJson.h index 77a995670b..f30d480185 100644 --- a/src/modules/http/inc/httpJson.h +++ b/src/modules/http/inc/httpJson.h @@ -47,7 +47,7 @@ typedef struct { // http response int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz); -int httpWriteBufByFd(int fd, const char* buf, int sz); +int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz); // builder callback typedef void (*httpJsonBuilder)(JsonBuf* buf, void* jsnHandle); diff --git a/src/modules/http/src/httpHandle.c b/src/modules/http/src/httpHandle.c index 7a24434863..4787ad9a33 100644 --- a/src/modules/http/src/httpHandle.c +++ b/src/modules/http/src/httpHandle.c @@ -259,51 +259,51 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test) } bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { - for (int tryTimes = 0; tryTimes < HTTP_RETRY_TIMES; ++tryTimes) { + for (int tryTimes = 0; tryTimes < HTTP_READ_RETRY_TIMES; ++tryTimes) { bool parsedOk = httpParseChunkedBody(pContext, pParser, true); if (parsedOk) { httpParseChunkedBody(pContext, pParser, false); - return HTTP_PARSE_BODY_SUCCESS; + return HTTP_CHECK_BODY_SUCCESS; } else { httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr); if (!httpReadDataImp(pContext)) { httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); - return HTTP_PARSE_BODY_ERROR; + return HTTP_CHECK_BODY_ERROR; } else { - taosMsleep(1); + taosMsleep(HTTP_READ_WAIT_TIME_MS); } } } httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr); - return HTTP_PARSE_BODY_CONTINUE; + return HTTP_CHECK_BODY_CONTINUE; } int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { - for (int tryTimes = 0; tryTimes < HTTP_RETRY_TIMES; ++tryTimes) { + for (int tryTimes = 0; tryTimes < HTTP_READ_RETRY_TIMES; ++tryTimes) { int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer); if (dataReadLen > pParser->data.len) { httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, dataReadLen:%d > pContext->data.len:%d", pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); - return HTTP_PARSE_BODY_ERROR; + return HTTP_CHECK_BODY_ERROR; } else if (dataReadLen < pParser->data.len) { httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, dataReadLen:%d < pContext->data.len:%d, continue read", pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); if (!httpReadDataImp(pContext)) { httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); - return HTTP_PARSE_BODY_ERROR; + return HTTP_CHECK_BODY_ERROR; } else { - taosMsleep(1); + taosMsleep(HTTP_READ_WAIT_TIME_MS); } } else { - return HTTP_PARSE_BODY_SUCCESS; + return HTTP_CHECK_BODY_SUCCESS; } } httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr); - return HTTP_PARSE_BODY_CONTINUE; + return HTTP_CHECK_BODY_CONTINUE; } bool httpParseRequest(HttpContext* pContext) { @@ -360,17 +360,17 @@ int httpCheckReadCompleted(HttpContext* pContext) { HttpParser *pParser = &pContext->parser; if (pContext->httpChunked == HTTP_UNCUNKED) { int ret = httpReadUnChunkedBody(pContext, pParser); - if (ret != HTTP_PARSE_BODY_SUCCESS) { + if (ret != HTTP_CHECK_BODY_SUCCESS) { return ret; } } else { int ret = httpReadChunkedBody(pContext, pParser); - if (ret != HTTP_PARSE_BODY_SUCCESS) { + if (ret != HTTP_CHECK_BODY_SUCCESS) { return ret; } } - return HTTP_PARSE_BODY_SUCCESS; + return HTTP_CHECK_BODY_SUCCESS; } bool httpDecodeRequest(HttpContext* pContext) { diff --git a/src/modules/http/src/httpJson.c b/src/modules/http/src/httpJson.c index cb036d71c9..692d275399 100644 --- a/src/modules/http/src/httpJson.c +++ b/src/modules/http/src/httpJson.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "http.h" #include "httpCode.h" @@ -39,40 +40,47 @@ char JsonNulTkn[] = "null"; char JsonTrueTkn[] = "true"; char JsonFalseTkn[] = "false"; -int httpWriteBufByFd(int fd, const char* buf, int sz) { - const int countTimes = 3; - const int waitTime = 5; // 5ms +int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) { int len; int countWait = 0; + int writeLen = 0; do { - if (fd > 2) - len = (int)send(fd, buf, (size_t)sz, MSG_NOSIGNAL); - else - len = sz; + if (pContext->fd > 2){ + len = (int)send(pContext->fd, buf + writeLen, (size_t)(sz - writeLen), MSG_NOSIGNAL); + } + else { + return sz; + } + if (len < 0) { - break; + httpTrace("context:%p, fd:%d, ip:%s, socket write errno:%d, times:%d", + pContext, pContext->fd, pContext->ipstr, errno, countWait); + if (++countWait > HTTP_WRITE_RETRY_TIMES) break; + taosMsleep(HTTP_WRITE_WAIT_TIME_MS); + continue; } else if (len == 0) { - // wait & count - if (++countWait > countTimes) return -1; - sleep((uint32_t)waitTime); + httpTrace("context:%p, fd:%d, ip:%s, socket write errno:%d, connect already closed", + pContext, pContext->fd, pContext->ipstr, errno); + break; } else { countWait = 0; + writeLen += len; } - buf += len; - } while (len < (sz -= len)); + } while (writeLen < sz); - return sz; + return writeLen; } int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz) { - int writeSz = httpWriteBufByFd(pContext->fd, buf, sz); + int writeSz = httpWriteBufByFd(pContext, buf, sz); - if (writeSz == -1) { - httpError("context:%p, fd:%d, ip:%s, size:%d, response failed:\n%s", pContext, pContext->fd, pContext->ipstr, sz, - buf); + if (writeSz != sz) { + httpError("context:%p, fd:%d, ip:%s, size:%d, write size:%d, failed to send response:\n%s", + pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); } else { - httpTrace("context:%p, fd:%d, ip:%s, size:%d, response:\n%s", pContext, pContext->fd, pContext->ipstr, sz, buf); + httpTrace("context:%p, fd:%d, ip:%s, size:%d, write size:%d, response:\n%s", + pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); } return writeSz; @@ -231,7 +239,7 @@ void httpJsonStringForTransMean(JsonBuf* buf, char* sVal, int maxLen) { void httpJsonInt64(JsonBuf* buf, int64_t num) { httpJsonItemToken(buf); httpJsonTestBuf(buf, MAX_NUM_STR_SZ); - buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%lld", num); + buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%ld", num); } void httpJsonTimestamp(JsonBuf* buf, int64_t t) { diff --git a/src/modules/http/src/httpServer.c b/src/modules/http/src/httpServer.c index 6e3590786b..97bfc6a118 100644 --- a/src/modules/http/src/httpServer.c +++ b/src/modules/http/src/httpServer.c @@ -217,7 +217,8 @@ void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) { } } -void httpCloseContextByServerFromTimer(HttpContext *pContext) { +void httpCloseContextByServerFromTimer(void *param, void *tmrId) { + HttpContext *pContext = (HttpContext *)param; httpError("context:%p, fd:%d, ip:%s, read http body error, time expired", pContext, pContext->fd, pContext->ipstr); httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); httpCloseContextByServer(pContext->pThread, pContext); @@ -283,18 +284,10 @@ bool httpReadDataImp(HttpContext *pContext) { pParser->bufsize += nread; break; } else if (nread < 0) { - if (errno == EINTR) { - if (blocktimes++ > HTTP_RETRY_TIMES) { - taosMsleep(1); - httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, EINTER times:%d", - pContext, pContext->fd, pContext->ipstr, errno, blocktimes); - break; - } - continue; - } else if (errno == EAGAIN || errno == EWOULDBLOCK) { - if (blocktimes++ > HTTP_RETRY_TIMES) { - taosMsleep(1); - httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, EAGAIN times:%d", + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + if (blocktimes++ > HTTP_READ_RETRY_TIMES) { + taosMsleep(HTTP_READ_WAIT_TIME_MS); + httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, error times:%d", pContext, pContext->fd, pContext->ipstr, errno, blocktimes); break; } @@ -342,11 +335,11 @@ bool httpReadData(HttpThread *pThread, HttpContext *pContext) { } int ret = httpCheckReadCompleted(pContext); - if (ret == HTTP_PARSE_BODY_CONTINUE) { + if (ret == HTTP_CHECK_BODY_CONTINUE) { httpTrace("context:%p, fd:%d, ip:%s, not finished yet, try another times", pContext, pContext->fd, pContext->ipstr); taosTmrReset(httpCloseContextByServerFromTimer, HTTP_EXPIRED_TIME, pContext, pThread->pServer->timerHandle, &pContext->readTimer); return false; - } else if (ret == HTTP_PARSE_BODY_SUCCESS){ + } else if (ret == HTTP_CHECK_BODY_SUCCESS){ httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, body:\n%s", pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.data.pos); return true; diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index 87050a8ab9..cdfdfed36b 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -42,6 +42,7 @@ #include #include #include +#include bool taosCheckPthreadValid(pthread_t thread); -- GitLab