提交 73f2baac 编写于 作者: S Shengliang Guan

[TD-637] fix refcount error in restful

上级 492f1f38
...@@ -108,8 +108,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { ...@@ -108,8 +108,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
} }
void mnodeReleaseConn(SConnObj *pConn) { void mnodeReleaseConn(SConnObj *pConn) {
if(pConn == NULL) return; if (pConn == NULL) return;
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
} }
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
......
...@@ -188,6 +188,7 @@ typedef struct HttpContext { ...@@ -188,6 +188,7 @@ typedef struct HttpContext {
char user[TSDB_USER_LEN]; // parsed from auth token or login message char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
void * taos; void * taos;
void * ppContext;
HttpSession *session; HttpSession *session;
z_stream gzipStream; z_stream gzipStream;
HttpParser parser; HttpParser parser;
......
...@@ -105,15 +105,15 @@ HttpContext *httpCreateContext(int32_t fd) { ...@@ -105,15 +105,15 @@ HttpContext *httpCreateContext(int32_t fd) {
char fdStr[12] = {0}; char fdStr[12] = {0};
snprintf(fdStr, sizeof(fdStr), "%d", fd); snprintf(fdStr, sizeof(fdStr), "%d", fd);
//atomic_add_fetch_32(&pContext->refCount, 1);
pContext->fd = fd; pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10; pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec(); pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY; pContext->state = HTTP_CONTEXT_STATE_READY;
taosCachePut(tsHttpServer.contextCache, fdStr, &pContext, sizeof(HttpContext *), 5); HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, fdStr, &pContext, sizeof(HttpContext *), 5);
httpTrace("context:%p, fd:%d is created", pContext, fd); pContext->ppContext = ppContext;
httpTrace("context:%p, fd:%d, is created", pContext, fd);
return pContext; return pContext;
} }
...@@ -128,7 +128,7 @@ HttpContext *httpGetContext(int32_t fd) { ...@@ -128,7 +128,7 @@ HttpContext *httpGetContext(int32_t fd) {
HttpContext *pContext = *ppContext; HttpContext *pContext = *ppContext;
if (pContext) { if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpTrace("context:%p, fd:%d is accquired, refCount:%d", pContext, pContext->fd, refCount); httpTrace("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount);
return pContext; return pContext;
} }
} }
...@@ -138,9 +138,10 @@ HttpContext *httpGetContext(int32_t fd) { ...@@ -138,9 +138,10 @@ HttpContext *httpGetContext(int32_t fd) {
void httpReleaseContext(HttpContext *pContext) { void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0); assert(refCount >= 0);
httpTrace("context:%p, fd:%d is releasd, refCount:%d", pContext, pContext->fd, refCount); httpTrace("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount);
taosCacheRelease(tsHttpServer.contextCache, (void **)(&pContext), false); HttpContext **ppContext = pContext->ppContext;
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} }
bool httpInitContext(HttpContext *pContext) { bool httpInitContext(HttpContext *pContext) {
...@@ -205,9 +206,6 @@ void httpCloseContextByApp(HttpContext *pContext) { ...@@ -205,9 +206,6 @@ void httpCloseContextByApp(HttpContext *pContext) {
} }
void httpCloseContextByServer(HttpContext *pContext) { void httpCloseContextByServer(HttpContext *pContext) {
httpRemoveContextFromEpoll(pContext);
pContext->parsed = false;
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
...@@ -220,5 +218,7 @@ void httpCloseContextByServer(HttpContext *pContext) { ...@@ -220,5 +218,7 @@ void httpCloseContextByServer(HttpContext *pContext) {
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state); httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state);
} }
pContext->parsed = false;
httpRemoveContextFromEpoll(pContext);
httpReleaseContext(pContext); httpReleaseContext(pContext);
} }
...@@ -272,7 +272,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -272,7 +272,7 @@ static void *httpAcceptHttpConnection(void *arg) {
taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno)); taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno));
return NULL; return NULL;
} else { } else {
httpPrint("http service init success at %u", pServer->serverPort); httpPrint("http server init success at %u", pServer->serverPort);
pServer->status = HTTP_SERVER_RUNNING; pServer->status = HTTP_SERVER_RUNNING;
} }
...@@ -316,12 +316,9 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -316,12 +316,9 @@ static void *httpAcceptHttpConnection(void *arg) {
pContext->pThread = pThread; pContext->pThread = pThread;
sprintf(pContext->ipstr, "%s:%u", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); sprintf(pContext->ipstr, "%s:%u", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, totalFds:%d, accept a new connection", pContext,
connFd, pContext->ipstr, pThread->label, pThread->numOfFds, totalFds);
struct epoll_event event; struct epoll_event event;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
event.data.fd = connFd; event.data.fd = connFd;
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
...@@ -333,6 +330,8 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -333,6 +330,8 @@ static void *httpAcceptHttpConnection(void *arg) {
// notify the data process, add into the FdObj list // notify the data process, add into the FdObj list
atomic_add_fetch_32(&pThread->numOfFds, 1); atomic_add_fetch_32(&pThread->numOfFds, 1);
httpTrace("context:%p, fd:%d, ip:%s, thread:%s numOfFds:%d totalFds:%d, accept a new connection", pContext, connFd,
pContext->ipstr, pThread->label, pThread->numOfFds, totalFds);
// pick up next thread for next connection // pick up next thread for next connection
threadId++; threadId++;
......
...@@ -58,7 +58,7 @@ static void httpFetchSessionImp(HttpContext *pContext) { ...@@ -58,7 +58,7 @@ static void httpFetchSessionImp(HttpContext *pContext) {
pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId); pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId);
if (pContext->session != NULL) { if (pContext->session != NULL) {
atomic_add_fetch_32(&pContext->refCount, 1); atomic_add_fetch_32(&pContext->session->refCount, 1);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, refCount:%d", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, refCount:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
} else { } else {
...@@ -83,7 +83,7 @@ void httpGetSession(HttpContext *pContext) { ...@@ -83,7 +83,7 @@ void httpGetSession(HttpContext *pContext) {
void httpReleaseSession(HttpContext *pContext) { void httpReleaseSession(HttpContext *pContext) {
if (pContext == NULL || pContext->session == NULL) return; if (pContext == NULL || pContext->session == NULL) return;
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1);
assert(refCount >= 0); assert(refCount >= 0);
httpTrace("context:%p, session:%p is releasd refCount:%d", pContext, pContext->session, pContext->session->refCount); httpTrace("context:%p, session:%p is releasd refCount:%d", pContext, pContext->session, pContext->session->refCount);
......
...@@ -68,7 +68,7 @@ int httpStartSystem() { ...@@ -68,7 +68,7 @@ int httpStartSystem() {
} }
if (!httpInitContexts()) { if (!httpInitContexts()) {
httpError("http init session failed"); httpError("http init contexts failed");
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册