From e55a55a6f82e523e65a81785db6d3c36fb875561 Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 21:57:07 +0800 Subject: [PATCH] add ehttpInc/DecContextRef --- src/plugins/http/inc/httpContext.h | 3 ++ src/plugins/http/inc/httpInt.h | 2 + src/plugins/http/src/httpContext.c | 76 ++++++++++++++++++++++++++---- src/plugins/http/src/httpServer.c | 29 ++++++++---- 4 files changed, 92 insertions(+), 18 deletions(-) diff --git a/src/plugins/http/inc/httpContext.h b/src/plugins/http/inc/httpContext.h index a2d50d6b7f..594900d0cf 100644 --- a/src/plugins/http/inc/httpContext.h +++ b/src/plugins/http/inc/httpContext.h @@ -31,4 +31,7 @@ void httpCloseContextByApp(HttpContext *pContext); void httpNotifyContextClose(HttpContext *pContext); bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState); +void ehttpIncContextRef(HttpContext *pContext); +void ehttpDecContextRef(HttpContext **ppContext); + #endif diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index bde799d6d6..40f980f101 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -212,6 +212,8 @@ typedef struct HttpContext { void * timer; HttpEncodeMethod * encodeMethod; struct HttpThread *pThread; + + int closed:2; } HttpContext; typedef struct HttpThread { diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 4440da6d45..b229673df2 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -28,6 +28,7 @@ #include "httpSql.h" #include "httpSession.h" +#include "httpContext.h" #include "elog.h" // dirty tweak @@ -44,12 +45,20 @@ static void on_body(void *arg, const char *chunk, size_t len); static void on_end(void *arg); static void on_error(void *arg, int status_code); +static void httpDestroyContext(void *data); +static void httpMightDestroyContext(void *data); +static void ehttpReleaseContext(HttpContext *pContext); + static void httpRemoveContextFromEpoll(HttpContext *pContext) { HttpThread *pThread = pContext->pThread; if (pContext->fd >= 0) { epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); - taosCloseSocket(pContext->fd); + int32_t fd = pContext->fd; pContext->fd = -1; + taosCloseSocket(fd); + if (!tsHttpServer.fallback) { + ehttpDecContextRef(&pContext); + } } } @@ -83,12 +92,11 @@ static void httpDestroyContext(void *data) { } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc"); + tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpMightDestroyContext, "restc"); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; } - D("==cache [%p] created==", tsHttpServer.contextCache); return true; } @@ -136,10 +144,12 @@ HttpContext *httpCreateContext(int32_t fd) { pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; + ehttpIncContextRef(pContext); HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(int64_t), &pContext, sizeof(int64_t), 3); pContext->ppContext = ppContext; httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); + ehttpIncContextRef(pContext); // set the ref to 0 taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); @@ -148,10 +158,13 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *httpGetContext(void *ptr) { HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &ptr, sizeof(HttpContext *)); + EQ_ASSERT(ppContext); + EQ_ASSERT(*ppContext); if (ppContext) { HttpContext *pContext = *ppContext; if (pContext) { + if (!tsHttpServer.fallback) return pContext; int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount); return pContext; @@ -161,6 +174,10 @@ HttpContext *httpGetContext(void *ptr) { } void httpReleaseContext(HttpContext *pContext) { + if (!tsHttpServer.fallback) { + ehttpReleaseContext(pContext); + return; + } int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); if (refCount < 0) { httpError("context:%p, is already released, refCount:%d", pContext, refCount); @@ -217,7 +234,9 @@ bool httpInitContext(HttpContext *pContext) { } void httpCloseContextByApp(HttpContext *pContext) { - D("=="); + if (!tsHttpServer.fallback) { + if (pContext->parsed == false) return; + } pContext->parsed = false; bool keepAlive = true; @@ -229,7 +248,6 @@ void httpCloseContextByApp(HttpContext *pContext) { } if (keepAlive) { - D("==keepAlive=="); if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) { httpDebug("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd, pContext->ipstr); @@ -250,16 +268,19 @@ void httpCloseContextByApp(HttpContext *pContext) { pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); } } else { - D("==not keepAlive=="); httpRemoveContextFromEpoll(pContext); httpDebug("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close context", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); } - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); } void httpCloseContextByServer(HttpContext *pContext) { + if (!tsHttpServer.fallback) { + if (pContext->closed) return; + pContext->closed = 1; + } if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { httpDebug("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { @@ -274,7 +295,7 @@ void httpCloseContextByServer(HttpContext *pContext) { pContext->parsed = false; httpRemoveContextFromEpoll(pContext); - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); } @@ -409,7 +430,44 @@ static void on_error(void *arg, int status_code) { HttpContext *pContext = (HttpContext*)arg; HttpParser *pParser = &pContext->parser; - D("=="); pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED; } +static void httpMightDestroyContext(void *data) { + HttpContext *pContext = *(HttpContext **)data; + if (!tsHttpServer.fallback) { + httpRemoveContextFromEpoll(pContext); + ehttpDecContextRef(&pContext); + return; + } + int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); + if (refCount>0) return; + EQ_ASSERT(refCount==0); + httpDestroyContext(data); +} + +static void ehttpReleaseContext(HttpContext *pContext) { + HttpContext **ppContext = pContext->ppContext; + + if (tsHttpServer.contextCache != NULL) { + taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); + } else { + httpDebug("context:%p, won't be destroyed for cache is already released", pContext); + // httpDestroyContext((void **)(&ppContext)); + } +} + +void ehttpIncContextRef(HttpContext *pContext) { + if (tsHttpServer.fallback) return; + atomic_add_fetch_32(&pContext->refCount, 1); +} + +void ehttpDecContextRef(HttpContext **ppContext) { + if (tsHttpServer.fallback) return; + HttpContext *pContext = *ppContext; + int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); + if (refCount>0) return; + EQ_ASSERT(refCount==0); + httpDestroyContext(ppContext); +} + diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 819f7a5f4a..5a785d2e55 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -194,6 +194,8 @@ static void httpProcessHttpData(void *param) { sigaddset(&set, SIGPIPE); pthread_sigmask(SIG_SETMASK, &set, NULL); + elog_set_thread_name("httpProcessHttpData"); + while (1) { struct epoll_event events[HTTP_MAX_EVENTS]; //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 @@ -209,14 +211,18 @@ static void httpProcessHttpData(void *param) { if (pContext == NULL) { httpError("context:%p, is already released, close connect", events[i].data.ptr); //epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, events[i].data.fd, NULL); - //tclose(events[i].data.fd); + //taosClose(events[i].data.fd); continue; } + ehttpIncContextRef(pContext); + if (events[i].events & EPOLLPRI) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -224,6 +230,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -231,6 +239,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -238,6 +248,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -245,6 +257,7 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -253,11 +266,15 @@ static void httpProcessHttpData(void *param) { pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE); httpNotifyContextClose(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); } else { if (httpReadData(pContext)) { (*(pThread->processData))(pContext); atomic_fetch_add_32(&pServer->requestNum, 1); } + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); } } } @@ -338,7 +355,8 @@ static void *httpAcceptHttpConnection(void *arg) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, pContext->ipstr, pThread->label, strerror(errno)); tclose(pContext->fd); - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -455,7 +473,6 @@ static bool ehttpReadData(HttpContext *pContext) { int ret = HTTP_CHECK_BODY_SUCCESS; if (ret == HTTP_CHECK_BODY_CONTINUE) { //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); - httpReleaseContext(pContext); return false; } else if (ret == HTTP_CHECK_BODY_SUCCESS){ httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", @@ -464,13 +481,11 @@ static bool ehttpReadData(HttpContext *pContext) { return true; } else { httpNotifyContextClose(pContext); - httpReleaseContext(pContext); return false; } } else { httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); httpNotifyContextClose(pContext); - httpReleaseContext(pContext); return false; } } @@ -483,14 +498,10 @@ static bool ehttpReadData(HttpContext *pContext) { } else { httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", pContext, pContext->fd, pContext->ipstr, errno); - D("==releasing because of reading failed=="); - httpReleaseContext(pContext); return false; } } else { // eof - D("==releasing because of remote close/reset=="); - httpReleaseContext(pContext); return false; } } -- GitLab