提交 9d0fcea1 编写于 作者: S slguan

data transfer via restful interface may be lost

上级 fbdeadbc
...@@ -60,11 +60,14 @@ ...@@ -60,11 +60,14 @@
#define HTTP_PROCESS_ERROR 0 #define HTTP_PROCESS_ERROR 0
#define HTTP_PROCESS_SUCCESS 1 #define HTTP_PROCESS_SUCCESS 1
#define HTTP_PARSE_BODY_ERROR -1 #define HTTP_CHECK_BODY_ERROR -1
#define HTTP_PARSE_BODY_CONTINUE 0 #define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_PARSE_BODY_SUCCESS 1 #define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_RETRY_TIMES 2 #define HTTP_READ_RETRY_TIMES 3
#define HTTP_READ_WAIT_TIME_MS 3
#define HTTP_WRITE_RETRY_TIMES 100
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000 #define HTTP_EXPIRED_TIME 60000
struct HttpContext; struct HttpContext;
......
...@@ -47,7 +47,7 @@ typedef struct { ...@@ -47,7 +47,7 @@ typedef struct {
// http response // http response
int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz); int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz);
int httpWriteBufByFd(int fd, const char* buf, int sz); int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz);
// builder callback // builder callback
typedef void (*httpJsonBuilder)(JsonBuf* buf, void* jsnHandle); typedef void (*httpJsonBuilder)(JsonBuf* buf, void* jsnHandle);
......
...@@ -259,51 +259,51 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test) ...@@ -259,51 +259,51 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test)
} }
bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) {
for (int tryTimes = 0; tryTimes < HTTP_RETRY_TIMES; ++tryTimes) { for (int tryTimes = 0; tryTimes < HTTP_READ_RETRY_TIMES; ++tryTimes) {
bool parsedOk = httpParseChunkedBody(pContext, pParser, true); bool parsedOk = httpParseChunkedBody(pContext, pParser, true);
if (parsedOk) { if (parsedOk) {
httpParseChunkedBody(pContext, pParser, false); httpParseChunkedBody(pContext, pParser, false);
return HTTP_PARSE_BODY_SUCCESS; return HTTP_CHECK_BODY_SUCCESS;
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd,
pContext->ipstr); pContext->ipstr);
if (!httpReadDataImp(pContext)) { if (!httpReadDataImp(pContext)) {
httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr);
return HTTP_PARSE_BODY_ERROR; return HTTP_CHECK_BODY_ERROR;
} else { } else {
taosMsleep(1); taosMsleep(HTTP_READ_WAIT_TIME_MS);
} }
} }
} }
httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr);
return HTTP_PARSE_BODY_CONTINUE; return HTTP_CHECK_BODY_CONTINUE;
} }
int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) {
for (int tryTimes = 0; tryTimes < HTTP_RETRY_TIMES; ++tryTimes) { for (int tryTimes = 0; tryTimes < HTTP_READ_RETRY_TIMES; ++tryTimes) {
int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer); int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer);
if (dataReadLen > pParser->data.len) { if (dataReadLen > pParser->data.len) {
httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, dataReadLen:%d > pContext->data.len:%d", httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, dataReadLen:%d > pContext->data.len:%d",
pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len);
httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR);
return HTTP_PARSE_BODY_ERROR; return HTTP_CHECK_BODY_ERROR;
} else if (dataReadLen < pParser->data.len) { } else if (dataReadLen < pParser->data.len) {
httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, dataReadLen:%d < pContext->data.len:%d, continue read", httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, dataReadLen:%d < pContext->data.len:%d, continue read",
pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len);
if (!httpReadDataImp(pContext)) { if (!httpReadDataImp(pContext)) {
httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr);
return HTTP_PARSE_BODY_ERROR; return HTTP_CHECK_BODY_ERROR;
} else { } else {
taosMsleep(1); taosMsleep(HTTP_READ_WAIT_TIME_MS);
} }
} else { } else {
return HTTP_PARSE_BODY_SUCCESS; return HTTP_CHECK_BODY_SUCCESS;
} }
} }
httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr);
return HTTP_PARSE_BODY_CONTINUE; return HTTP_CHECK_BODY_CONTINUE;
} }
bool httpParseRequest(HttpContext* pContext) { bool httpParseRequest(HttpContext* pContext) {
...@@ -360,17 +360,17 @@ int httpCheckReadCompleted(HttpContext* pContext) { ...@@ -360,17 +360,17 @@ int httpCheckReadCompleted(HttpContext* pContext) {
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
if (pContext->httpChunked == HTTP_UNCUNKED) { if (pContext->httpChunked == HTTP_UNCUNKED) {
int ret = httpReadUnChunkedBody(pContext, pParser); int ret = httpReadUnChunkedBody(pContext, pParser);
if (ret != HTTP_PARSE_BODY_SUCCESS) { if (ret != HTTP_CHECK_BODY_SUCCESS) {
return ret; return ret;
} }
} else { } else {
int ret = httpReadChunkedBody(pContext, pParser); int ret = httpReadChunkedBody(pContext, pParser);
if (ret != HTTP_PARSE_BODY_SUCCESS) { if (ret != HTTP_CHECK_BODY_SUCCESS) {
return ret; return ret;
} }
} }
return HTTP_PARSE_BODY_SUCCESS; return HTTP_CHECK_BODY_SUCCESS;
} }
bool httpDecodeRequest(HttpContext* pContext) { bool httpDecodeRequest(HttpContext* pContext) {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include "http.h" #include "http.h"
#include "httpCode.h" #include "httpCode.h"
...@@ -39,40 +40,47 @@ char JsonNulTkn[] = "null"; ...@@ -39,40 +40,47 @@ char JsonNulTkn[] = "null";
char JsonTrueTkn[] = "true"; char JsonTrueTkn[] = "true";
char JsonFalseTkn[] = "false"; char JsonFalseTkn[] = "false";
int httpWriteBufByFd(int fd, const char* buf, int sz) { int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) {
const int countTimes = 3;
const int waitTime = 5; // 5ms
int len; int len;
int countWait = 0; int countWait = 0;
int writeLen = 0;
do { do {
if (fd > 2) if (pContext->fd > 2){
len = (int)send(fd, buf, (size_t)sz, MSG_NOSIGNAL); len = (int)send(pContext->fd, buf + writeLen, (size_t)(sz - writeLen), MSG_NOSIGNAL);
else }
len = sz; else {
return sz;
}
if (len < 0) { if (len < 0) {
break; httpTrace("context:%p, fd:%d, ip:%s, socket write errno:%d, times:%d",
pContext, pContext->fd, pContext->ipstr, errno, countWait);
if (++countWait > HTTP_WRITE_RETRY_TIMES) break;
taosMsleep(HTTP_WRITE_WAIT_TIME_MS);
continue;
} else if (len == 0) { } else if (len == 0) {
// wait & count httpTrace("context:%p, fd:%d, ip:%s, socket write errno:%d, connect already closed",
if (++countWait > countTimes) return -1; pContext, pContext->fd, pContext->ipstr, errno);
sleep((uint32_t)waitTime); break;
} else { } else {
countWait = 0; countWait = 0;
writeLen += len;
} }
buf += len; } while (writeLen < sz);
} while (len < (sz -= len));
return sz; return writeLen;
} }
int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz) { int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz) {
int writeSz = httpWriteBufByFd(pContext->fd, buf, sz); int writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz == -1) { if (writeSz != sz) {
httpError("context:%p, fd:%d, ip:%s, size:%d, response failed:\n%s", pContext, pContext->fd, pContext->ipstr, sz, httpError("context:%p, fd:%d, ip:%s, size:%d, write size:%d, failed to send response:\n%s",
buf); pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf);
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, size:%d, response:\n%s", pContext, pContext->fd, pContext->ipstr, sz, buf); httpTrace("context:%p, fd:%d, ip:%s, size:%d, write size:%d, response:\n%s",
pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf);
} }
return writeSz; return writeSz;
...@@ -231,7 +239,7 @@ void httpJsonStringForTransMean(JsonBuf* buf, char* sVal, int maxLen) { ...@@ -231,7 +239,7 @@ void httpJsonStringForTransMean(JsonBuf* buf, char* sVal, int maxLen) {
void httpJsonInt64(JsonBuf* buf, int64_t num) { void httpJsonInt64(JsonBuf* buf, int64_t num) {
httpJsonItemToken(buf); httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ); httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%lld", num); buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%ld", num);
} }
void httpJsonTimestamp(JsonBuf* buf, int64_t t) { void httpJsonTimestamp(JsonBuf* buf, int64_t t) {
......
...@@ -217,7 +217,8 @@ void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) { ...@@ -217,7 +217,8 @@ void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) {
} }
} }
void httpCloseContextByServerFromTimer(HttpContext *pContext) { void httpCloseContextByServerFromTimer(void *param, void *tmrId) {
HttpContext *pContext = (HttpContext *)param;
httpError("context:%p, fd:%d, ip:%s, read http body error, time expired", pContext, pContext->fd, pContext->ipstr); httpError("context:%p, fd:%d, ip:%s, read http body error, time expired", pContext, pContext->fd, pContext->ipstr);
httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR);
httpCloseContextByServer(pContext->pThread, pContext); httpCloseContextByServer(pContext->pThread, pContext);
...@@ -283,18 +284,10 @@ bool httpReadDataImp(HttpContext *pContext) { ...@@ -283,18 +284,10 @@ bool httpReadDataImp(HttpContext *pContext) {
pParser->bufsize += nread; pParser->bufsize += nread;
break; break;
} else if (nread < 0) { } else if (nread < 0) {
if (errno == EINTR) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
if (blocktimes++ > HTTP_RETRY_TIMES) { if (blocktimes++ > HTTP_READ_RETRY_TIMES) {
taosMsleep(1); taosMsleep(HTTP_READ_WAIT_TIME_MS);
httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, EINTER times:%d", httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, error times:%d",
pContext, pContext->fd, pContext->ipstr, errno, blocktimes);
break;
}
continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (blocktimes++ > HTTP_RETRY_TIMES) {
taosMsleep(1);
httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, EAGAIN times:%d",
pContext, pContext->fd, pContext->ipstr, errno, blocktimes); pContext, pContext->fd, pContext->ipstr, errno, blocktimes);
break; break;
} }
...@@ -342,11 +335,11 @@ bool httpReadData(HttpThread *pThread, HttpContext *pContext) { ...@@ -342,11 +335,11 @@ bool httpReadData(HttpThread *pThread, HttpContext *pContext) {
} }
int ret = httpCheckReadCompleted(pContext); int ret = httpCheckReadCompleted(pContext);
if (ret == HTTP_PARSE_BODY_CONTINUE) { if (ret == HTTP_CHECK_BODY_CONTINUE) {
httpTrace("context:%p, fd:%d, ip:%s, not finished yet, try another times", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, ip:%s, not finished yet, try another times", pContext, pContext->fd, pContext->ipstr);
taosTmrReset(httpCloseContextByServerFromTimer, HTTP_EXPIRED_TIME, pContext, pThread->pServer->timerHandle, &pContext->readTimer); taosTmrReset(httpCloseContextByServerFromTimer, HTTP_EXPIRED_TIME, pContext, pThread->pServer->timerHandle, &pContext->readTimer);
return false; return false;
} else if (ret == HTTP_PARSE_BODY_SUCCESS){ } else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, body:\n%s", httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, body:\n%s",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.data.pos); pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.data.pos);
return true; return true;
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include <stdint.h> #include <stdint.h>
#include <pthread.h> #include <pthread.h>
#include <stdbool.h> #include <stdbool.h>
#include <limits.h>
bool taosCheckPthreadValid(pthread_t thread); bool taosCheckPthreadValid(pthread_t thread);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册