未验证 提交 12187a38 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #9101 from haoyifan/tcache_cleanup_branch

Tcache cleanup branch
...@@ -85,7 +85,7 @@ static void doInitRefreshThread(void) { ...@@ -85,7 +85,7 @@ static void doInitRefreshThread(void) {
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
} }
pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) { static pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) {
pthread_once(&cacheThreadInit, doInitRefreshThread); pthread_once(&cacheThreadInit, doInitRefreshThread);
pthread_mutex_lock(&guard); pthread_mutex_lock(&guard);
...@@ -184,36 +184,44 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem ...@@ -184,36 +184,44 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem
} }
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) { SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
const int32_t SLEEP_DURATION = 500; //500 ms const int32_t SLEEP_DURATION_IN_MS = 500;
if (refreshTimeInSeconds <= 0) { if (refreshTimeInSeconds <= 0) {
return NULL; return NULL;
} }
SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj)); SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
if (pCacheObj == NULL) { if (pCacheObj == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL; return NULL;
} }
pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK); pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
pCacheObj->name = strdup(cacheName);
if (pCacheObj->pHashTable == NULL) { if (pCacheObj->pHashTable == NULL) {
free(pCacheObj); free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL; return NULL;
} }
pCacheObj->name = strdup(cacheName);
if (pCacheObj->name == NULL) {
taosHashCleanup(pCacheObj->pHashTable);
free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
// set free cache node callback function // set free cache node callback function
pCacheObj->freeFp = fn; pCacheObj->freeFp = fn;
pCacheObj->refreshTime = refreshTimeInSeconds * 1000; pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION_IN_MS;
pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access
if (__cache_lock_init(pCacheObj) != 0) { if (__cache_lock_init(pCacheObj) != 0) {
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
free(pCacheObj->name);
free(pCacheObj); free(pCacheObj);
uError("failed to init lock, reason:%s", strerror(errno)); uError("failed to init lock, reason:%s", strerror(errno));
return NULL; return NULL;
} }
...@@ -323,10 +331,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -323,10 +331,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL; if (pCacheObj == NULL || data == NULL) return NULL;
size_t offset = offsetof(SCacheDataNode, data); size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset); SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
if (ptNode->signature != (uint64_t)ptNode) { if (ptNode->signature != (uint64_t)ptNode) {
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode); uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL; return NULL;
...@@ -342,22 +350,22 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { ...@@ -342,22 +350,22 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL; if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
size_t offset = offsetof(SCacheDataNode, data); size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset); SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
if (ptNode->signature != (uint64_t)ptNode) { if (ptNode->signature != (uint64_t)ptNode) {
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode); uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL; return NULL;
} }
assert(T_REF_VAL_GET(ptNode) >= 1); assert(T_REF_VAL_GET(ptNode) >= 1);
char *d = *data; char *d = *data;
// clear its reference to old area // clear its reference to old area
*data = NULL; *data = NULL;
return d; return d;
} }
...@@ -377,7 +385,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -377,7 +385,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
// It happens when there is only one object in the cache, and two threads which has referenced this object // It happens when there is only one object in the cache, and two threads which has referenced this object
// start to free the it simultaneously [TD-1569]. // start to free the it simultaneously [TD-1569].
size_t offset = offsetof(SCacheDataNode, data); size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
if (pNode->signature != (uint64_t)pNode) { if (pNode->signature != (uint64_t)pNode) {
uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode); uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
...@@ -542,8 +550,8 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { ...@@ -542,8 +550,8 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
// wait for the refresh thread quit before destroying the cache object. // wait for the refresh thread quit before destroying the cache object.
// But in the dll, the child thread will be killed before atexit takes effect. // But in the dll, the child thread will be killed before atexit takes effect.
while(atomic_load_8(&pCacheObj->deleting) != 0) { while(atomic_load_8(&pCacheObj->deleting) != 0) {
if (refreshWorkerNormalStopped) break; if (refreshWorkerNormalStopped) break;
if (refreshWorkerUnexpectedStopped) return; if (refreshWorkerUnexpectedStopped) return;
taosMsleep(50); taosMsleep(50);
} }
...@@ -649,7 +657,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { ...@@ -649,7 +657,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
taosTrashcanEmpty(pCacheObj, true); taosTrashcanEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj); __cache_lock_destroy(pCacheObj);
tfree(pCacheObj->name); tfree(pCacheObj->name);
memset(pCacheObj, 0, sizeof(SCacheObj)); memset(pCacheObj, 0, sizeof(SCacheObj));
free(pCacheObj); free(pCacheObj);
...@@ -694,12 +702,12 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -694,12 +702,12 @@ void* taosCacheTimedRefresh(void *handle) {
setThreadName("cacheRefresh"); setThreadName("cacheRefresh");
const int32_t SLEEP_DURATION = 500; //500 ms const int32_t SLEEP_DURATION_IN_MS = 500;
int64_t count = 0; int64_t count = 0;
atexit(taosCacheRefreshWorkerUnexpectedStopped); atexit(taosCacheRefreshWorkerUnexpectedStopped);
while(1) { while(1) {
taosMsleep(SLEEP_DURATION); taosMsleep(SLEEP_DURATION_IN_MS);
if (stopRefreshWorker) { if (stopRefreshWorker) {
goto _end; goto _end;
} }
...@@ -778,4 +786,4 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) ...@@ -778,4 +786,4 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1)
void taosStopCacheRefreshWorker(void) { void taosStopCacheRefreshWorker(void) {
stopRefreshWorker = true; stopRefreshWorker = true;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册