diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 55ab9b031beaa35da49523af21c581a5aae47e3b..9bd5cdf1750b106df9eed8c033c1adff91d9b8e5 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -36,9 +36,16 @@ static int32_t walInitObj(SWal *pWal); static void walFreeObj(void *pWal); int32_t walInit() { + int32_t code = 0; tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); - int32_t code = walCreateThread(); + code = pthread_mutex_init(&tsWal.mutex, NULL); + if (code) { + wError("failed to init wal mutex since %s", tstrerror(code)); + return code; + } + + code = walCreateThread(); if (code != TSDB_CODE_SUCCESS) { wError("failed to init wal module since %s", tstrerror(code)); return code; @@ -51,6 +58,7 @@ int32_t walInit() { void walCleanUp() { walStopThread(); taosCloseRef(tsWal.refId); + pthread_mutex_destroy(&tsWal.mutex); wInfo("wal module is cleaned up"); } @@ -183,10 +191,15 @@ static void walFsyncAll() { } static void *walThreadFunc(void *param) { + int stop = 0; while (1) { walUpdateSeq(); walFsyncAll(); - if (tsWal.stop) break; + + pthread_mutex_lock(&tsWal.mutex); + stop = tsWal.stop; + pthread_mutex_unlock(&tsWal.mutex); + if (stop) break; } return NULL; @@ -209,7 +222,10 @@ static int32_t walCreateThread() { } static void walStopThread() { + pthread_mutex_lock(&tsWal.mutex); tsWal.stop = 1; + pthread_mutex_unlock(&tsWal.mutex); + if (taosCheckPthreadValid(tsWal.thread)) { pthread_join(tsWal.thread, NULL); }