未验证 提交 af0ff7c2 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6941 from taosdata/feature/query

Feature/query
...@@ -40,8 +40,9 @@ ...@@ -40,8 +40,9 @@
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeTelemetry.h" #include "dnodeTelemetry.h"
#include "module.h" #include "module.h"
#include "qScript.h"
#include "mnode.h" #include "mnode.h"
#include "qScript.h"
#include "tcache.h"
#include "tscompression.h" #include "tscompression.h"
#if !defined(_MODULE) || !defined(_TD_LINUX) #if !defined(_MODULE) || !defined(_TD_LINUX)
...@@ -208,6 +209,7 @@ void dnodeCleanUpSystem() { ...@@ -208,6 +209,7 @@ void dnodeCleanUpSystem() {
dnodeCleanupComponents(); dnodeCleanupComponents();
taos_cleanup(); taos_cleanup();
taosCloseLog(); taosCloseLog();
taosStopCacheRefreshWorker();
} }
} }
...@@ -320,12 +322,12 @@ static int32_t dnodeInitStorage() { ...@@ -320,12 +322,12 @@ static int32_t dnodeInitStorage() {
static void dnodeCleanupStorage() { static void dnodeCleanupStorage() {
// storage destroy // storage destroy
tfsDestroy(); tfsDestroy();
#ifdef TD_TSZ #ifdef TD_TSZ
// compress destroy // compress destroy
tsCompressExit(); tsCompressExit();
#endif #endif
} }
bool dnodeIsFirstDeploy() { bool dnodeIsFirstDeploy() {
......
...@@ -83,6 +83,7 @@ typedef struct { ...@@ -83,6 +83,7 @@ typedef struct {
uint8_t deleting; // set the deleting flag to stop refreshing ASAP. uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
pthread_t refreshWorker; pthread_t refreshWorker;
bool extendLifespan; // auto extend life span when one item is accessed. bool extendLifespan; // auto extend life span when one item is accessed.
int64_t checkTick; // tick used to record the check times of the refresh threads
#if defined(LINUX) #if defined(LINUX)
pthread_rwlock_t lock; pthread_rwlock_t lock;
#else #else
...@@ -177,6 +178,11 @@ void taosCacheCleanup(SCacheObj *pCacheObj); ...@@ -177,6 +178,11 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
*/ */
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp);
/**
* stop background refresh worker thread
*/
void taosStopCacheRefreshWorker();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -54,6 +54,45 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { ...@@ -54,6 +54,45 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
#endif #endif
} }
/**
* do cleanup the taos cache
* @param pCacheObj
*/
static void doCleanupDataCache(SCacheObj *pCacheObj);
/**
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle
*/
static void* taosCacheTimedRefresh(void *handle);
static pthread_t cacheRefreshWorker = {0};
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
static SArray* pCacheArrayList = NULL;
static bool stopRefreshWorker = false;
static void doInitRefreshThread(void) {
pCacheArrayList = taosArrayInit(4, POINTER_BYTES);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
pthread_attr_destroy(&thattr);
}
pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) {
pthread_once(&cacheThreadInit, doInitRefreshThread);
pthread_mutex_lock(&guard);
taosArrayPush(pCacheArrayList, &pCacheObj);
pthread_mutex_unlock(&guard);
return cacheRefreshWorker;
}
/** /**
* @param key key of object for hash, usually a null-terminated string * @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key * @param keyLen length of key
...@@ -142,19 +181,9 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem ...@@ -142,19 +181,9 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem
free(pElem); free(pElem);
} }
/**
* do cleanup the taos cache
* @param pCacheObj
*/
static void doCleanupDataCache(SCacheObj *pCacheObj);
/**
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle
*/
static void* taosCacheTimedRefresh(void *handle);
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) { SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
const int32_t SLEEP_DURATION = 500; //500 ms
if (refreshTimeInSeconds <= 0) { if (refreshTimeInSeconds <= 0) {
return NULL; return NULL;
} }
...@@ -174,9 +203,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -174,9 +203,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
} }
// set free cache node callback function // set free cache node callback function
pCacheObj->freeFp = fn; pCacheObj->freeFp = fn;
pCacheObj->refreshTime = refreshTimeInSeconds * 1000; pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
pCacheObj->extendLifespan = extendLifespan; pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access
if (__cache_lock_init(pCacheObj) != 0) { if (__cache_lock_init(pCacheObj) != 0) {
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
...@@ -186,13 +216,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -186,13 +216,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
return NULL; return NULL;
} }
pthread_attr_t thattr; doRegisterCacheObj(pCacheObj);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj);
pthread_attr_destroy(&thattr);
return pCacheObj; return pCacheObj;
} }
...@@ -364,7 +388,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -364,7 +388,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) { if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
uDebug("cache:%s data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime); uDebug("cache:%s, data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
} }
if (_remove) { if (_remove) {
...@@ -510,8 +534,10 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { ...@@ -510,8 +534,10 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
} }
pCacheObj->deleting = 1; pCacheObj->deleting = 1;
if (taosCheckPthreadValid(pCacheObj->refreshWorker)) {
pthread_join(pCacheObj->refreshWorker, NULL); // wait for the refresh thread quit before destroying the cache object.
while(atomic_load_8(&pCacheObj->deleting) != 0) {
taosMsleep(50);
} }
uInfo("cache:%s will be cleaned up", pCacheObj->name); uInfo("cache:%s will be cleaned up", pCacheObj->name);
...@@ -650,50 +676,79 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t ...@@ -650,50 +676,79 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t
} }
void* taosCacheTimedRefresh(void *handle) { void* taosCacheTimedRefresh(void *handle) {
SCacheObj* pCacheObj = handle; assert(pCacheArrayList != NULL);
if (pCacheObj == NULL) { uDebug("cache refresh thread starts");
uDebug("object is destroyed. no refresh retry");
return NULL;
}
setThreadName("cacheTimedRefre"); setThreadName("cacheTimedRefre");
const int32_t SLEEP_DURATION = 500; //500 ms const int32_t SLEEP_DURATION = 500; //500 ms
int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;
int64_t count = 0; int64_t count = 0;
while(1) {
taosMsleep(500);
// check if current cache object will be deleted every 500ms. while(1) {
if (pCacheObj->deleting) { taosMsleep(SLEEP_DURATION);
uDebug("%s refresh threads quit", pCacheObj->name); if (stopRefreshWorker) {
break; goto _end;
} }
if (++count < totalTick) { pthread_mutex_lock(&guard);
continue; size_t size = taosArrayGetSize(pCacheArrayList);
} pthread_mutex_unlock(&guard);
// reset the count value count += 1;
count = 0;
size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
continue;
}
uDebug("%s refresh thread timed scan", pCacheObj->name); for(int32_t i = 0; i < size; ++i) {
pCacheObj->statistics.refreshCount++; pthread_mutex_lock(&guard);
SCacheObj* pCacheObj = taosArrayGetP(pCacheArrayList, i);
// refresh data in hash table if (pCacheObj == NULL) {
if (elemInHash > 0) { uError("object is destroyed. ignore and try next");
int64_t now = taosGetTimestampMs(); pthread_mutex_unlock(&guard);
doCacheRefresh(pCacheObj, now, NULL); continue;
} }
// check if current cache object will be deleted every 500ms.
if (pCacheObj->deleting) {
taosArrayRemove(pCacheArrayList, i);
size = taosArrayGetSize(pCacheArrayList);
uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRIzu, pCacheObj->name, size);
pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj to continue releasing resources.
pthread_mutex_unlock(&guard);
continue;
}
pthread_mutex_unlock(&guard);
if ((count % pCacheObj->checkTick) != 0) {
continue;
}
taosTrashcanEmpty(pCacheObj, false); size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
continue;
}
uDebug("%s refresh thread scan", pCacheObj->name);
pCacheObj->statistics.refreshCount++;
// refresh data in hash table
if (elemInHash > 0) {
int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, NULL);
}
taosTrashcanEmpty(pCacheObj, false);
}
} }
_end:
taosArrayDestroy(pCacheArrayList);
pCacheArrayList = NULL;
pthread_mutex_destroy(&guard);
uDebug("cache refresh thread quits");
return NULL; return NULL;
} }
...@@ -705,3 +760,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { ...@@ -705,3 +760,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, fp); doCacheRefresh(pCacheObj, now, fp);
} }
void taosStopCacheRefreshWorker() {
stopRefreshWorker = false;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册