diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index b74dfbfdda987dd156f8ed10c86aa578b9720137..c28ad66b65ebb8802431e74584afff05ce1b4cfc 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -204,6 +204,7 @@ static void *dnodeProcessVWriteQueue(void *param) { break; } + bool forceFsync = false; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite, @@ -211,11 +212,12 @@ static void *dnodeProcessVWriteQueue(void *param) { pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); if (pWrite->code <= 0) pWrite->processedCount = 1; + if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); } - walFsync(vnodeGetWal(pVnode)); + walFsync(vnodeGetWal(pVnode), forceFsync); // browse all items, and process them one by one taosResetQitems(pWorker->qall); diff --git a/src/inc/twal.h b/src/inc/twal.h index d9b5fb26ffd388ae1cd0540ae7286b126391cd37..931cf5dabab4bb640d362ff71ba1bac35b58f364 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -55,7 +55,7 @@ void walStop(twalh); void walClose(twalh); int32_t walRenew(twalh); int32_t walWrite(twalh, SWalHead *); -void walFsync(twalh); +void walFsync(twalh, bool forceFsync); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); int64_t walGetVersion(twalh); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index fc986521e62b33c7512b7213cb99ec7dfddcf705..4e370ca0280d6aeb17023794a8b23fa2cc20742d 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) { } } - walFsync(tsSdbObj.wal); + walFsync(tsSdbObj.wal, true); // browse all items, and process them one by one taosResetQitems(tsSdbWriteQall); diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 194c50264fa4301b99b0531efbe91c76562f1b3d..b19e084e06a41fb2758be619b1979ef3b273221a 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -111,11 +111,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) { return code; } -void walFsync(void *handle) { +void walFsync(void *handle, bool forceFsync) { SWal *pWal = handle; - if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return; + if (pWal == NULL || pWal->fd < 0) return; - if (pWal->fsyncPeriod == 0) { + if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { + wTrace("vgId:%d, file:%s, do fsync, force:%d", pWal->vgId, pWal->name, forceFsync); if (fsync(pWal->fd) < 0) { wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); }