diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ca6dfcee9efb03058ad2401ed57f2dce6f581d59..056314460dc18fd787a76a63124a55359dcdf5c2 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4962,6 +4962,7 @@ static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { pMsg->commitTime = htonl(pCreateDb->commitTime); pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock); pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock); + pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod); pMsg->compression = pCreateDb->compressionLevel; pMsg->walLevel = (char)pCreateDb->walLevel; pMsg->replications = pCreateDb->replica; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index fbdfc84f5ffc26815c8318ff93f494af7801b6d9..235441e0a56579a32f9372686b2f64dbb55e3b48 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -78,6 +78,7 @@ extern int16_t tsCommitTime; // seconds extern int32_t tsTimePrecision; extern int16_t tsCompression; extern int16_t tsWAL; +extern int32_t tsFsyncPeriod; extern int32_t tsReplications; // balance diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index a8ef3f8e8d348b6b67fae6cc1dcd3f810b57e3ec..2bc0f3459c98525b42fd4323b74183223249cb39 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -110,6 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION; int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; +int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsMaxVgroupsPerDb = 0; int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES; @@ -702,6 +703,16 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "fsyncPeriod"; + cfg.ptr = &tsFsyncPeriod; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = TSDB_MIN_FSYNC_PERIOD; + cfg.maxValue = TSDB_MAX_FSYNC_PERIOD; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "replica"; cfg.ptr = &tsReplications; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index eb3c3abc840121043e7a36ef884f63f4c5caef57..b9cc35def5b5c9598f3a68b14ba4332652e1a45e 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -332,6 +332,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_MAX_WAL_LEVEL 2 #define TSDB_DEFAULT_WAL_LEVEL 1 +#define TSDB_MIN_FSYNC_PERIOD 0 +#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond +#define TSDB_DEFAULT_FSYNC_PERIOD 1000 // one second + #define TSDB_MIN_DB_REPLICA_OPTION 1 #define TSDB_MAX_DB_REPLICA_OPTION 3 #define TSDB_DEFAULT_DB_REPLICA_OPTION 1 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d96e9f68d5a33addb84ca08d286be7b88b62db30..dec3f60b649a9f852c4ca01925f9d27023badc13 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -515,6 +515,7 @@ typedef struct { int32_t minRowsPerFileBlock; int32_t maxRowsPerFileBlock; int32_t commitTime; + int32_t fsyncPeriod; uint8_t precision; // time resolution int8_t compression; int8_t walLevel; @@ -608,6 +609,7 @@ typedef struct { int32_t minRowsPerFileBlock; int32_t maxRowsPerFileBlock; int32_t commitTime; + int32_t fsyncPeriod; int8_t precision; int8_t compression; int8_t walLevel; diff --git a/src/inc/twal.h b/src/inc/twal.h index 8c2c3c69a1261ea0d86ac8d3b09c435a74b65085..4fdb7aa275d539ae63129e038c4ae50f59552e1f 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -35,6 +35,7 @@ typedef struct { typedef struct { int8_t walLevel; // wal level + int32_t fsyncPeriod; // millisecond int8_t wals; // number of WAL files; int8_t keep; // keep the wal file when closed } SWalCfg; diff --git a/src/kit/taosmigrate/taosmigrateVnodeCfg.c b/src/kit/taosmigrate/taosmigrateVnodeCfg.c index b925fb10aa3bb59951726497b3c77a7a8d5ef142..1a1087aa9d3db424cfb8194e7611b86504339341 100644 --- a/src/kit/taosmigrate/taosmigrateVnodeCfg.c +++ b/src/kit/taosmigrate/taosmigrateVnodeCfg.c @@ -48,6 +48,7 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile) len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel); + len += snprintf(content + len, maxLen - len, " \"fsyncPeriod\": %d,\n", pVnode->walCfg.fsyncPeriod); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnode->syncCfg.replica); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnode->walCfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnode->syncCfg.quorum); @@ -212,6 +213,13 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile) } pVnode->walCfg.walLevel = (int8_t) walLevel->valueint; + cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod"); + if (!fsyncPeriod || fsyncPeriod->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, fsyncPeriod not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint; + cJSON *wals = cJSON_GetObjectItem(root, "wals"); if (!wals || wals->type != cJSON_Number) { printf("vgId:%d, failed to read vnode cfg, wals not found\n", pVnode->vgId); diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index 90c2ff9e66d01a7fc89f4a1d8861025bc7668e7b..b7489a9fc56553c55ea4097a5df65b3a34f7b54b 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -160,6 +160,7 @@ typedef struct { int32_t minRowsPerFileBlock; int32_t maxRowsPerFileBlock; int32_t commitTime; + int32_t fsyncPeriod; int8_t precision; int8_t compression; int8_t walLevel; diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index e8183e9089bd97cc40c20368cfcfa25ca450a368..6bb6f450a582d2036db063f3896b76ff4a01511f 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -287,14 +287,14 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { return TSDB_CODE_MND_INVALID_DB_OPTION; } - if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) { - mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION, - TSDB_MAX_DB_REPLICA_OPTION); + if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) { + mError("invalid db option fsyncPeriod:%d, valid range: [%d, %d]", pCfg->fsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD); return TSDB_CODE_MND_INVALID_DB_OPTION; } - if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL) { - mError("invalid db option walLevel:%d must be greater than 0", pCfg->walLevel); + if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) { + mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION, + TSDB_MAX_DB_REPLICA_OPTION); return TSDB_CODE_MND_INVALID_DB_OPTION; } @@ -318,6 +318,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep; if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = tsMinRowsInFileBlock; if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = tsMaxRowsInFileBlock; + if (pCfg->fsyncPeriod <0) pCfg->fsyncPeriod = tsFsyncPeriod; if (pCfg->commitTime < 0) pCfg->commitTime = tsCommitTime; if (pCfg->precision < 0) pCfg->precision = tsTimePrecision; if (pCfg->compression < 0) pCfg->compression = tsCompression; @@ -367,6 +368,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs .daysToKeep2 = pCreate->daysToKeep2, .minRowsPerFileBlock = pCreate->minRowsPerFileBlock, .maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock, + .fsyncPeriod = pCreate->fsyncPeriod, .commitTime = pCreate->commitTime, .precision = pCreate->precision, .compression = pCreate->compression, @@ -682,6 +684,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *(int8_t *)pWrite = pDb->cfg.walLevel; cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pDb->cfg.fsyncPeriod; + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(int8_t *)pWrite = pDb->cfg.compression; cols++; @@ -785,6 +791,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { int32_t minRows = htonl(pAlter->minRowsPerFileBlock); int32_t maxRows = htonl(pAlter->maxRowsPerFileBlock); int32_t commitTime = htonl(pAlter->commitTime); + int32_t fsyncPeriod = htonl(pAlter->fsyncPeriod); int8_t compression = pAlter->compression; int8_t walLevel = pAlter->walLevel; int8_t replications = pAlter->replications; @@ -861,6 +868,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { terrno = TSDB_CODE_MND_INVALID_DB_OPTION; } + if (fsyncPeriod >= 0 && fsyncPeriod != pDb->cfg.fsyncPeriod) { + mError("db:%s, can't alter fsyncPeriod option", pDb->name); + terrno = TSDB_CODE_MND_INVALID_DB_OPTION; + } + if (replications > 0 && replications != pDb->cfg.replications) { mDebug("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications); newCfg.replications = replications; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index d3192a2460716ee31881715c4146de4330c8ca0c..1c11d25baac01e5914f6bd6cd954268dc5c401a8 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -170,7 +170,7 @@ static void *sdbGetTableFromId(int32_t tableId) { } static int32_t sdbInitWal() { - SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1}; + SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0}; char temp[TSDB_FILENAME_LEN]; sprintf(temp, "%s/wal", tsMnodeDir); tsSdbObj.wal = walOpen(temp, &walCfg); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 5658f4bcbf17ac2f6f0fa7981f6abf3c170dada7..40d896606c5c27a33c62be90cddad17d29a8249c 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -755,6 +755,7 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2); pCfg->minRowsPerFileBlock = htonl(pDb->cfg.minRowsPerFileBlock); pCfg->maxRowsPerFileBlock = htonl(pDb->cfg.maxRowsPerFileBlock); + pCfg->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod); pCfg->commitTime = htonl(pDb->cfg.commitTime); pCfg->precision = pDb->cfg.precision; pCfg->compression = pDb->cfg.compression; diff --git a/src/query/inc/qsqlparser.h b/src/query/inc/qsqlparser.h index 704f3e74187ef1686be1ec7babd30b3fe2b2b1e1..9e8706cf9220c2a0d0e55fb0a8f1028a876da9d4 100644 --- a/src/query/inc/qsqlparser.h +++ b/src/query/inc/qsqlparser.h @@ -116,6 +116,7 @@ typedef struct SCreateDBInfo { int32_t daysPerFile; int32_t minRowsPerBlock; int32_t maxRowsPerBlock; + int32_t fsyncPeriod; int64_t commitTime; int32_t walLevel; int32_t compressionLevel; diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index b4ea1254b7094a8f9a3f83d55da63d990dc21b09..aa6f794d5b75c7e49451c50d038dff3aa4a88abc 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -221,6 +221,7 @@ maxrows(Y) ::= MAXROWS INTEGER(X). { Y = X; } blocks(Y) ::= BLOCKS INTEGER(X). { Y = X; } ctime(Y) ::= CTIME INTEGER(X). { Y = X; } wal(Y) ::= WAL INTEGER(X). { Y = X; } +fsync(Y) ::= FSYNC INTEGER(X). { Y = X; } comp(Y) ::= COMP INTEGER(X). { Y = X; } prec(Y) ::= PRECISION STRING(X). { Y = X; } @@ -233,6 +234,7 @@ db_optr(Y) ::= db_optr(Z) replica(X). { Y = Z; Y.replica = strtol(X.z, NU db_optr(Y) ::= db_optr(Z) days(X). { Y = Z; Y.daysPerFile = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) minrows(X). { Y = Z; Y.minRowsPerBlock = strtod(X.z, NULL); } db_optr(Y) ::= db_optr(Z) maxrows(X). { Y = Z; Y.maxRowsPerBlock = strtod(X.z, NULL); } +db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtod(X.z, NULL); } db_optr(Y) ::= db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) ctime(X). { Y = Z; Y.commitTime = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } @@ -249,6 +251,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; } alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } +alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtod(X.z, NULL, 10); } %type typename {TAOS_FIELD} typename(A) ::= ids(X). { diff --git a/src/query/src/qparserImpl.c b/src/query/src/qparserImpl.c index d4ac540d2fc725488bae83fb8890840f6ddb59a9..ecc11f8f4d76769d0d52b03f44813349aed29adc 100644 --- a/src/query/src/qparserImpl.c +++ b/src/query/src/qparserImpl.c @@ -896,6 +896,7 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) { pDBInfo->compressionLevel = -1; pDBInfo->walLevel = -1; + pDBInfo->fsyncPeriod = -1; pDBInfo->commitTime = -1; pDBInfo->maxTablesPerVnode = -1; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 695a55d47638c1b0e05651be24d2480b9d5b975c..e46e400c15c77a4a4f9c1a74e9acf0a09dd9a10d 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -69,6 +69,7 @@ int32_t vnodeInitResources() { } void vnodeCleanupResources() { + if (tsDnodeVnodesHash != NULL) { taosHashCleanup(tsDnodeVnodesHash); tsDnodeVnodesHash = NULL; @@ -137,7 +138,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return TSDB_CODE_VND_INIT_FAILED; } - vInfo("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel); + vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); return code; @@ -618,6 +619,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel); + len += snprintf(content + len, maxLen - len, " \"fsyncPeriod\": %d,\n", pVnodeCfg->cfg.fsyncPeriod); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); @@ -782,6 +784,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { } pVnode->walCfg.walLevel = (int8_t) walLevel->valueint; + cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod"); + if (!walLevel || walLevel->type != cJSON_Number) { + vError("vgId:%d, failed to read vnode cfg, fsyncPeriod not found", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint; + cJSON *wals = cJSON_GetObjectItem(root, "wals"); if (!wals || wals->type != cJSON_Number) { vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId); diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index 94a0fdc956577d9cc2e35f6bd780dcb6b8b33950..45bacc317fa14d076f404f34b2d0c81dbb38b648 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -25,6 +25,7 @@ #include "tlog.h" #include "tchecksum.h" #include "tutil.h" +#include "ttimer.h" #include "taoserror.h" #include "twal.h" #include "tqueue.h" @@ -44,6 +45,9 @@ typedef struct { int fd; int keep; int level; + int32_t fsyncPeriod; + void *timer; + void *signature; int max; // maximum number of wal files uint32_t id; // increase continuously int num; // number of wal files @@ -52,10 +56,20 @@ typedef struct { pthread_mutex_t mutex; } SWal; +static void *walTmrCtrl = NULL; +static int tsWalNum = 0; +static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT; static uint32_t walSignature = 0xFAFBFDFE; -static int walHandleExistingFiles(const char *path); -static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); -static int walRemoveWalFiles(const char *path); +static int walHandleExistingFiles(const char *path); +static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); +static int walRemoveWalFiles(const char *path); +static void walProcessFsyncTimer(void *param, void *tmrId); +static void walRelease(SWal *pWal); + +static void walModuleInitFunc() { + walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL"); + if (walTmrCtrl == NULL) walModuleInit = PTHREAD_ONCE_INIT; +} void *walOpen(const char *path, const SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); @@ -64,20 +78,38 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { return NULL; } + pthread_once(&walModuleInit, walModuleInitFunc); + if (walTmrCtrl == NULL) { + free(pWal); + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + atomic_add_fetch_32(&tsWalNum, 1); pWal->fd = -1; pWal->max = pCfg->wals; pWal->id = 0; pWal->num = 0; pWal->level = pCfg->walLevel; pWal->keep = pCfg->keep; + pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->signature = pWal; tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); + if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) { + pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl); + if (pWal->timer == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + walRelease(pWal); + return NULL; + } + } + if (tmkdir(path, 0755) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("wal:%s, failed to create directory(%s)", path, strerror(errno)); - pthread_mutex_destroy(&pWal->mutex); - free(pWal); + walRelease(pWal); pWal = NULL; } @@ -89,12 +121,11 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { if (pWal && pWal->fd <0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("wal:%s, failed to open(%s)", path, strerror(errno)); - pthread_mutex_destroy(&pWal->mutex); - free(pWal); + walRelease(pWal); pWal = NULL; } - if (pWal) wDebug("wal:%s, it is open, level:%d", path, pWal->level); + if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod); return pWal; } @@ -102,7 +133,8 @@ void walClose(void *handle) { if (handle == NULL) return; SWal *pWal = handle; - close(pWal->fd); + tclose(pWal->fd); + if (pWal->timer) taosTmrStopA(&pWal->timer); if (pWal->keep == 0) { // remove all files in the directory @@ -118,9 +150,7 @@ void walClose(void *handle) { wDebug("wal:%s, it is closed and kept", pWal->name); } - pthread_mutex_destroy(&pWal->mutex); - - free(pWal); + walRelease(pWal); } int walRenew(void *handle) { @@ -194,9 +224,9 @@ int walWrite(void *handle, SWalHead *pHead) { void walFsync(void *handle) { SWal *pWal = handle; - if (pWal == NULL) return; + if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return; - if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) { + if (pWal->fsyncPeriod == 0) { if (fsync(pWal->fd) < 0) { wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); } @@ -303,6 +333,19 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } +static void walRelease(SWal *pWal) { + + pthread_mutex_destroy(&pWal->mutex); + pWal->signature = NULL; + free(pWal); + + if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) { + if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl); + walTmrCtrl = NULL; + walModuleInit = PTHREAD_ONCE_INIT; + } +} + static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { char *name = pWal->name; @@ -433,3 +476,15 @@ static int walRemoveWalFiles(const char *path) { return terrno; } +static void walProcessFsyncTimer(void *param, void *tmrId) { + SWal *pWal = param; + + if (pWal->signature != pWal) return; + if (pWal->fd < 0) return; + + if (fsync(pWal->fd) < 0) { + wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); + } + + pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl); +}