提交 e55a55a6 编写于 作者: F freemine

add ehttpInc/DecContextRef

上级 7c434d61
......@@ -31,4 +31,7 @@ void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
void ehttpIncContextRef(HttpContext *pContext);
void ehttpDecContextRef(HttpContext **ppContext);
#endif
......@@ -212,6 +212,8 @@ typedef struct HttpContext {
void * timer;
HttpEncodeMethod * encodeMethod;
struct HttpThread *pThread;
int closed:2;
} HttpContext;
typedef struct HttpThread {
......
......@@ -28,6 +28,7 @@
#include "httpSql.h"
#include "httpSession.h"
#include "httpContext.h"
#include "elog.h"
// dirty tweak
......@@ -44,12 +45,20 @@ static void on_body(void *arg, const char *chunk, size_t len);
static void on_end(void *arg);
static void on_error(void *arg, int status_code);
static void httpDestroyContext(void *data);
static void httpMightDestroyContext(void *data);
static void ehttpReleaseContext(HttpContext *pContext);
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread;
if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
taosCloseSocket(pContext->fd);
int32_t fd = pContext->fd;
pContext->fd = -1;
taosCloseSocket(fd);
if (!tsHttpServer.fallback) {
ehttpDecContextRef(&pContext);
}
}
}
......@@ -83,12 +92,11 @@ static void httpDestroyContext(void *data) {
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc");
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpMightDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
}
D("==cache [%p] created==", tsHttpServer.contextCache);
return true;
}
......@@ -136,10 +144,12 @@ HttpContext *httpCreateContext(int32_t fd) {
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
ehttpIncContextRef(pContext);
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(int64_t), &pContext, sizeof(int64_t), 3);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
ehttpIncContextRef(pContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
......@@ -148,10 +158,13 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *httpGetContext(void *ptr) {
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &ptr, sizeof(HttpContext *));
EQ_ASSERT(ppContext);
EQ_ASSERT(*ppContext);
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
if (!tsHttpServer.fallback) return pContext;
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount);
return pContext;
......@@ -161,6 +174,10 @@ HttpContext *httpGetContext(void *ptr) {
}
void httpReleaseContext(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
ehttpReleaseContext(pContext);
return;
}
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount < 0) {
httpError("context:%p, is already released, refCount:%d", pContext, refCount);
......@@ -217,7 +234,9 @@ bool httpInitContext(HttpContext *pContext) {
}
void httpCloseContextByApp(HttpContext *pContext) {
D("==");
if (!tsHttpServer.fallback) {
if (pContext->parsed == false) return;
}
pContext->parsed = false;
bool keepAlive = true;
......@@ -229,7 +248,6 @@ void httpCloseContextByApp(HttpContext *pContext) {
}
if (keepAlive) {
D("==keepAlive==");
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
httpDebug("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd,
pContext->ipstr);
......@@ -250,16 +268,19 @@ void httpCloseContextByApp(HttpContext *pContext) {
pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
} else {
D("==not keepAlive==");
httpRemoveContextFromEpoll(pContext);
httpDebug("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close context", pContext, pContext->fd,
pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
httpReleaseContext(pContext);
if (tsHttpServer.fallback) httpReleaseContext(pContext);
}
void httpCloseContextByServer(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
if (pContext->closed) return;
pContext->closed = 1;
}
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpDebug("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
......@@ -274,7 +295,7 @@ void httpCloseContextByServer(HttpContext *pContext) {
pContext->parsed = false;
httpRemoveContextFromEpoll(pContext);
httpReleaseContext(pContext);
if (tsHttpServer.fallback) httpReleaseContext(pContext);
}
......@@ -409,7 +430,44 @@ static void on_error(void *arg, int status_code) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
D("==");
pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED;
}
static void httpMightDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
if (!tsHttpServer.fallback) {
httpRemoveContextFromEpoll(pContext);
ehttpDecContextRef(&pContext);
return;
}
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount>0) return;
EQ_ASSERT(refCount==0);
httpDestroyContext(data);
}
static void ehttpReleaseContext(HttpContext *pContext) {
HttpContext **ppContext = pContext->ppContext;
if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
httpDebug("context:%p, won't be destroyed for cache is already released", pContext);
// httpDestroyContext((void **)(&ppContext));
}
}
void ehttpIncContextRef(HttpContext *pContext) {
if (tsHttpServer.fallback) return;
atomic_add_fetch_32(&pContext->refCount, 1);
}
void ehttpDecContextRef(HttpContext **ppContext) {
if (tsHttpServer.fallback) return;
HttpContext *pContext = *ppContext;
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount>0) return;
EQ_ASSERT(refCount==0);
httpDestroyContext(ppContext);
}
......@@ -194,6 +194,8 @@ static void httpProcessHttpData(void *param) {
sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL);
elog_set_thread_name("httpProcessHttpData");
while (1) {
struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
......@@ -209,14 +211,18 @@ static void httpProcessHttpData(void *param) {
if (pContext == NULL) {
httpError("context:%p, is already released, close connect", events[i].data.ptr);
//epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
//tclose(events[i].data.fd);
//taosClose(events[i].data.fd);
continue;
}
ehttpIncContextRef(pContext);
if (events[i].events & EPOLLPRI) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -224,6 +230,8 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -231,6 +239,8 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -238,6 +248,8 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -245,6 +257,7 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state));
httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -253,11 +266,15 @@ static void httpProcessHttpData(void *param) {
pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE);
httpNotifyContextClose(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
} else {
if (httpReadData(pContext)) {
(*(pThread->processData))(pContext);
atomic_fetch_add_32(&pServer->requestNum, 1);
}
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
}
}
}
......@@ -338,7 +355,8 @@ static void *httpAcceptHttpConnection(void *arg) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
pContext->ipstr, pThread->label, strerror(errno));
tclose(pContext->fd);
httpReleaseContext(pContext);
if (tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -455,7 +473,6 @@ static bool ehttpReadData(HttpContext *pContext) {
int ret = HTTP_CHECK_BODY_SUCCESS;
if (ret == HTTP_CHECK_BODY_CONTINUE) {
//httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
httpReleaseContext(pContext);
return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
......@@ -464,13 +481,11 @@ static bool ehttpReadData(HttpContext *pContext) {
return true;
} else {
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
} else {
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
}
......@@ -483,14 +498,10 @@ static bool ehttpReadData(HttpContext *pContext) {
} else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
pContext, pContext->fd, pContext->ipstr, errno);
D("==releasing because of reading failed==");
httpReleaseContext(pContext);
return false;
}
} else {
// eof
D("==releasing because of remote close/reset==");
httpReleaseContext(pContext);
return false;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册