提交 209f3db8 编写于 作者: S Shengliang Guan

[TD-637] cleanup cache in callback function

上级 73f2baac
...@@ -138,7 +138,7 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p ...@@ -138,7 +138,7 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p
static void mnodeFreeConn(void *data) { static void mnodeFreeConn(void *data) {
SConnObj *pConn = data; SConnObj *pConn = data;
tfree(pConn->pQueries); tfree(pConn->pQueries);
tfree(pConn->pQueries); tfree(pConn->pStreams);
mTrace("connId:%d, is destroyed", pConn->connId); mTrace("connId:%d, is destroyed", pConn->connId);
} }
......
...@@ -24,7 +24,7 @@ const char *httpContextStateStr(HttpContextState state); ...@@ -24,7 +24,7 @@ const char *httpContextStateStr(HttpContextState state);
HttpContext *httpCreateContext(int32_t fd); HttpContext *httpCreateContext(int32_t fd);
bool httpInitContext(HttpContext *pContext); bool httpInitContext(HttpContext *pContext);
HttpContext *httpGetContext(int32_t fd); HttpContext *httpGetContext(void * pContext);
void httpReleaseContext(HttpContext *pContext); void httpReleaseContext(HttpContext *pContext);
void httpCloseContextByServer(HttpContext *pContext); void httpCloseContextByServer(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext); void httpCloseContextByApp(HttpContext *pContext);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "tglobal.h" #include "tglobal.h"
#include "tcache.h" #include "tcache.h"
#include "hash.h"
#include "httpInt.h" #include "httpInt.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpSql.h" #include "httpSql.h"
...@@ -38,8 +39,6 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { ...@@ -38,8 +39,6 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) {
static void httpDestroyContext(void *data) { static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data; HttpContext *pContext = *(HttpContext **)data;
httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
if (pContext->fd > 0) tclose(pContext->fd); if (pContext->fd > 0) tclose(pContext->fd);
HttpThread *pThread = pContext->pThread; HttpThread *pThread = pContext->pThread;
...@@ -54,11 +53,12 @@ static void httpDestroyContext(void *data) { ...@@ -54,11 +53,12 @@ static void httpDestroyContext(void *data) {
httpFreeJsonBuf(pContext); httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext); httpFreeMultiCmds(pContext);
httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
tfree(pContext); tfree(pContext);
} }
bool httpInitContexts() { bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(5, httpDestroyContext); tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext);
if (tsHttpServer.contextCache == NULL) { if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache"); httpError("failed to init context cache");
return false; return false;
...@@ -70,7 +70,8 @@ bool httpInitContexts() { ...@@ -70,7 +70,8 @@ bool httpInitContexts() {
void httpCleanupContexts() { void httpCleanupContexts() {
// TODO: wait until all context is closed // TODO: wait until all context is closed
if (tsHttpServer.contextCache != NULL) { if (tsHttpServer.contextCache != NULL) {
httpPrint("context cache is cleanup"); SCacheObj *cache = tsHttpServer.contextCache;
httpPrint("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.contextCache); taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL; tsHttpServer.contextCache = NULL;
} }
...@@ -103,26 +104,29 @@ HttpContext *httpCreateContext(int32_t fd) { ...@@ -103,26 +104,29 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext)); HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL; if (pContext == NULL) return NULL;
char fdStr[12] = {0}; char contextStr[16] = {0};
snprintf(fdStr, sizeof(fdStr), "%d", fd); snprintf(contextStr, sizeof(contextStr), "%p", pContext);
pContext->fd = fd; pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10; pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec(); pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY; pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, fdStr, &pContext, sizeof(HttpContext *), 5); HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3);
pContext->ppContext = ppContext; pContext->ppContext = ppContext;
httpTrace("context:%p, fd:%d, is created", pContext, fd); httpTrace("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
return pContext; return pContext;
} }
HttpContext *httpGetContext(int32_t fd) { HttpContext *httpGetContext(void *ptr) {
char fdStr[12] = {0}; char contextStr[16] = {0};
snprintf(fdStr, sizeof(fdStr), "%d", fd); snprintf(contextStr, sizeof(contextStr), "%p", ptr);
HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, fdStr); HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr);
if (ppContext) { if (ppContext) {
HttpContext *pContext = *ppContext; HttpContext *pContext = *ppContext;
......
...@@ -193,7 +193,7 @@ static void httpProcessHttpData(void *param) { ...@@ -193,7 +193,7 @@ static void httpProcessHttpData(void *param) {
if (fdNum <= 0) continue; if (fdNum <= 0) continue;
for (int i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
pContext = httpGetContext(events[i].data.fd); pContext = httpGetContext(events[i].data.ptr);
if (pContext == NULL) { if (pContext == NULL) {
httpError("fd:%d, is already released, close connect", events[i].data.fd); httpError("fd:%d, is already released, close connect", events[i].data.fd);
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
...@@ -319,7 +319,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -319,7 +319,7 @@ static void *httpAcceptHttpConnection(void *arg) {
struct epoll_event event; struct epoll_event event;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
event.data.fd = connFd; event.data.ptr = pContext;
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
pContext->ipstr, pThread->label, strerror(errno)); pContext->ipstr, pThread->label, strerror(errno));
......
...@@ -35,6 +35,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) { ...@@ -35,6 +35,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire); pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire);
// void *temp = pContext->session;
// taosCacheRelease(server->sessionCache, (void **)&temp, false);
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user,
...@@ -44,7 +46,7 @@ void httpCreateSession(HttpContext *pContext, void *taos) { ...@@ -44,7 +46,7 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
return; return;
} }
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p refCount:%d", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
} }
...@@ -59,7 +61,7 @@ static void httpFetchSessionImp(HttpContext *pContext) { ...@@ -59,7 +61,7 @@ static void httpFetchSessionImp(HttpContext *pContext) {
pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId); pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId);
if (pContext->session != NULL) { if (pContext->session != NULL) {
atomic_add_fetch_32(&pContext->session->refCount, 1); atomic_add_fetch_32(&pContext->session->refCount, 1);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, refCount:%d", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr, httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr,
...@@ -85,26 +87,27 @@ void httpReleaseSession(HttpContext *pContext) { ...@@ -85,26 +87,27 @@ void httpReleaseSession(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1);
assert(refCount >= 0); assert(refCount >= 0);
httpTrace("context:%p, session:%p is releasd refCount:%d", pContext, pContext->session, pContext->session->refCount); httpTrace("context:%p, release session:%p:%p, sessionRef:%d", pContext, pContext->session, pContext->session->taos,
pContext->session->refCount);
taosCacheRelease(tsHttpServer.sessionCache, (void**)(&(pContext->session)), false); taosCacheRelease(tsHttpServer.sessionCache, (void **)&pContext->session, false);
pContext->session = NULL; pContext->session = NULL;
} }
static void httpDestroySession(void *data) { static void httpDestroySession(void *data) {
HttpSession *pSession = data; HttpSession *session = data;
httpTrace("session:%p:%p, is destroyed, refCount:%d", pSession, pSession->taos, pSession->refCount); httpTrace("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
if (pSession->taos != NULL) { if (session->taos != NULL) {
taos_close(pSession->taos); taos_close(session->taos);
pSession->taos = NULL; session->taos = NULL;
} }
tfree(pSession);
} }
void httpCleanUpSessions() { void httpCleanUpSessions() {
if (tsHttpServer.sessionCache != NULL) { if (tsHttpServer.sessionCache != NULL) {
httpPrint("session cache is cleanup"); SCacheObj *cache = tsHttpServer.sessionCache;
httpPrint("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.sessionCache); taosCacheCleanup(tsHttpServer.sessionCache);
tsHttpServer.sessionCache = NULL; tsHttpServer.sessionCache = NULL;
} }
......
...@@ -92,7 +92,7 @@ void httpStopSystem() { ...@@ -92,7 +92,7 @@ void httpStopSystem() {
} }
void httpCleanUpSystem() { void httpCleanUpSystem() {
httpPrint("http service cleanup"); httpPrint("http server cleanup");
httpStopSystem(); httpStopSystem();
httpCleanupContexts(); httpCleanupContexts();
......
...@@ -63,10 +63,12 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { ...@@ -63,10 +63,12 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
#endif #endif
} }
#if 0
static FORCE_INLINE void taosFreeNode(void *data) { static FORCE_INLINE void taosFreeNode(void *data) {
SCacheDataNode *pNode = *(SCacheDataNode **)data; SCacheDataNode *pNode = *(SCacheDataNode **)data;
free(pNode); free(pNode);
} }
#endif
/** /**
* @param key key of object for hash, usually a null-terminated string * @param key key of object for hash, usually a null-terminated string
...@@ -241,7 +243,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) ...@@ -241,7 +243,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
} }
// set free cache node callback function for hash table // set free cache node callback function for hash table
taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); // taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
pCacheObj->freeFp = freeCb; pCacheObj->freeFp = freeCb;
pCacheObj->refreshTime = refreshTime * 1000; pCacheObj->refreshTime = refreshTime * 1000;
...@@ -565,6 +567,16 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { ...@@ -565,6 +567,16 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
void doCleanupDataCache(SCacheObj *pCacheObj) { void doCleanupDataCache(SCacheObj *pCacheObj) {
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
//}
}
taosHashDestroyIter(pIter);
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册