diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index b57521e759ecd6d8a96a0567d80b897cbf5ec28f..7552ea5c25c136106ff998201500d299949d5932 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -138,7 +138,7 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p static void mnodeFreeConn(void *data) { SConnObj *pConn = data; tfree(pConn->pQueries); - tfree(pConn->pQueries); + tfree(pConn->pStreams); mTrace("connId:%d, is destroyed", pConn->connId); } diff --git a/src/plugins/http/inc/httpContext.h b/src/plugins/http/inc/httpContext.h index 27b732bf3eb20cd6bbf3992cb5bd9cef45de73cd..a2d50d6b7fdfba3bc80c918abd7999247636b6ee 100644 --- a/src/plugins/http/inc/httpContext.h +++ b/src/plugins/http/inc/httpContext.h @@ -24,7 +24,7 @@ const char *httpContextStateStr(HttpContextState state); HttpContext *httpCreateContext(int32_t fd); bool httpInitContext(HttpContext *pContext); -HttpContext *httpGetContext(int32_t fd); +HttpContext *httpGetContext(void * pContext); void httpReleaseContext(HttpContext *pContext); void httpCloseContextByServer(HttpContext *pContext); void httpCloseContextByApp(HttpContext *pContext); diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 9262e3f609f292a3e5368728795f53fdadc3b60f..183a332fb609cc896368ab61bcbce1109cbc3685 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -22,6 +22,7 @@ #include "ttimer.h" #include "tglobal.h" #include "tcache.h" +#include "hash.h" #include "httpInt.h" #include "httpResp.h" #include "httpSql.h" @@ -38,15 +39,13 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { static void httpDestroyContext(void *data) { HttpContext *pContext = *(HttpContext **)data; - httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount); - if (pContext->fd > 0) tclose(pContext->fd); HttpThread *pThread = pContext->pThread; httpRemoveContextFromEpoll(pContext); httpReleaseSession(pContext); atomic_sub_fetch_32(&pThread->numOfFds, 1); - + pContext->pThread = 0; pContext->state = HTTP_CONTEXT_STATE_CLOSED; @@ -54,11 +53,12 @@ static void httpDestroyContext(void *data) { httpFreeJsonBuf(pContext); httpFreeMultiCmds(pContext); + httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount); tfree(pContext); } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInitWithCb(5, httpDestroyContext); + tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; @@ -70,7 +70,8 @@ bool httpInitContexts() { void httpCleanupContexts() { // TODO: wait until all context is closed if (tsHttpServer.contextCache != NULL) { - httpPrint("context cache is cleanup"); + SCacheObj *cache = tsHttpServer.contextCache; + httpPrint("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable)); taosCacheCleanup(tsHttpServer.contextCache); tsHttpServer.contextCache = NULL; } @@ -103,26 +104,29 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *pContext = calloc(1, sizeof(HttpContext)); if (pContext == NULL) return NULL; - char fdStr[12] = {0}; - snprintf(fdStr, sizeof(fdStr), "%d", fd); + char contextStr[16] = {0}; + snprintf(contextStr, sizeof(contextStr), "%p", pContext); pContext->fd = fd; pContext->httpVersion = HTTP_VERSION_10; pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; - HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, fdStr, &pContext, sizeof(HttpContext *), 5); + HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3); pContext->ppContext = ppContext; - httpTrace("context:%p, fd:%d, is created", pContext, fd); + httpTrace("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext); + + // set the ref to 0 + taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); return pContext; } -HttpContext *httpGetContext(int32_t fd) { - char fdStr[12] = {0}; - snprintf(fdStr, sizeof(fdStr), "%d", fd); +HttpContext *httpGetContext(void *ptr) { + char contextStr[16] = {0}; + snprintf(contextStr, sizeof(contextStr), "%p", ptr); - HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, fdStr); + HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr); if (ppContext) { HttpContext *pContext = *ppContext; diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 06523540b7ed4716c33c13150cfe5b94141c6cf3..bea2bb083ace72bbaffc191d7d498c09ba967ffd 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -193,7 +193,7 @@ static void httpProcessHttpData(void *param) { if (fdNum <= 0) continue; for (int i = 0; i < fdNum; ++i) { - pContext = httpGetContext(events[i].data.fd); + pContext = httpGetContext(events[i].data.ptr); if (pContext == NULL) { httpError("fd:%d, is already released, close connect", events[i].data.fd); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); @@ -319,7 +319,7 @@ static void *httpAcceptHttpConnection(void *arg) { struct epoll_event event; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; - event.data.fd = connFd; + event.data.ptr = pContext; if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, pContext->ipstr, pThread->label, strerror(errno)); diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index af8a3e60023b7cab6548739160e538d244d22fe4..2b0735bfaf58c834d912942847e429bd828a4a33 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -35,6 +35,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) { snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire); + // void *temp = pContext->session; + // taosCacheRelease(server->sessionCache, (void **)&temp, false); if (pContext->session == NULL) { httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, @@ -44,7 +46,7 @@ void httpCreateSession(HttpContext *pContext, void *taos) { return; } - httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p refCount:%d", pContext, pContext->fd, + httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pthread_mutex_unlock(&server->serverMutex); } @@ -59,7 +61,7 @@ static void httpFetchSessionImp(HttpContext *pContext) { pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId); if (pContext->session != NULL) { atomic_add_fetch_32(&pContext->session->refCount, 1); - httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, refCount:%d", pContext, pContext->fd, + httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); } else { httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr, @@ -85,26 +87,27 @@ void httpReleaseSession(HttpContext *pContext) { int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1); assert(refCount >= 0); - httpTrace("context:%p, session:%p is releasd refCount:%d", pContext, pContext->session, pContext->session->refCount); + httpTrace("context:%p, release session:%p:%p, sessionRef:%d", pContext, pContext->session, pContext->session->taos, + pContext->session->refCount); - taosCacheRelease(tsHttpServer.sessionCache, (void**)(&(pContext->session)), false); + taosCacheRelease(tsHttpServer.sessionCache, (void **)&pContext->session, false); pContext->session = NULL; } static void httpDestroySession(void *data) { - HttpSession *pSession = data; - httpTrace("session:%p:%p, is destroyed, refCount:%d", pSession, pSession->taos, pSession->refCount); + HttpSession *session = data; + httpTrace("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount); - if (pSession->taos != NULL) { - taos_close(pSession->taos); - pSession->taos = NULL; + if (session->taos != NULL) { + taos_close(session->taos); + session->taos = NULL; } - tfree(pSession); } void httpCleanUpSessions() { if (tsHttpServer.sessionCache != NULL) { - httpPrint("session cache is cleanup"); + SCacheObj *cache = tsHttpServer.sessionCache; + httpPrint("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable)); taosCacheCleanup(tsHttpServer.sessionCache); tsHttpServer.sessionCache = NULL; } diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 6e21baca04bc4320d5dbc7ad9909550701e13979..015b0783b739a36eff88a398ce449eaff5771940 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -92,7 +92,7 @@ void httpStopSystem() { } void httpCleanUpSystem() { - httpPrint("http service cleanup"); + httpPrint("http server cleanup"); httpStopSystem(); httpCleanupContexts(); diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index ac06cf4f3f8b4c7355904ec9da1495bb0d9beeca..2b6083a91ccef8ef2a08902117e8f67340ba6a37 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -63,10 +63,12 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { #endif } +#if 0 static FORCE_INLINE void taosFreeNode(void *data) { SCacheDataNode *pNode = *(SCacheDataNode **)data; free(pNode); } +#endif /** * @param key key of object for hash, usually a null-terminated string @@ -241,7 +243,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) } // set free cache node callback function for hash table - taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); + // taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); pCacheObj->freeFp = freeCb; pCacheObj->refreshTime = refreshTime * 1000; @@ -565,7 +567,17 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { void doCleanupDataCache(SCacheObj *pCacheObj) { __cache_wr_lock(pCacheObj); - taosHashCleanup(pCacheObj->pHashTable); + + SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + while (taosHashIterNext(pIter)) { + SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + // if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); + //} + } + taosHashDestroyIter(pIter); + + taosHashCleanup(pCacheObj->pHashTable); __cache_unlock(pCacheObj); taosTrashCanEmpty(pCacheObj, true);