提交 f07f456b 编写于 作者: S Shengliang Guan

TD-1948

上级 b037302b
...@@ -204,6 +204,7 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -204,6 +204,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
break; break;
} }
bool forceFsync = false;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite, dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite,
...@@ -211,11 +212,12 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -211,11 +212,12 @@ static void *dnodeProcessVWriteQueue(void *param) {
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) pWrite->processedCount = 1;
if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
} }
walFsync(vnodeGetWal(pVnode)); walFsync(vnodeGetWal(pVnode), forceFsync);
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(pWorker->qall); taosResetQitems(pWorker->qall);
......
...@@ -55,7 +55,7 @@ void walStop(twalh); ...@@ -55,7 +55,7 @@ void walStop(twalh);
void walClose(twalh); void walClose(twalh);
int32_t walRenew(twalh); int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *); int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh); void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
int64_t walGetVersion(twalh); int64_t walGetVersion(twalh);
......
...@@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) {
} }
} }
walFsync(tsSdbObj.wal); walFsync(tsSdbObj.wal, true);
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(tsSdbWriteQall); taosResetQitems(tsSdbWriteQall);
......
...@@ -111,11 +111,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -111,11 +111,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
return code; return code;
} }
void walFsync(void *handle) { void walFsync(void *handle, bool forceFsync) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return; if (pWal == NULL || pWal->fd < 0) return;
if (pWal->fsyncPeriod == 0) { if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, file:%s, do fsync, force:%d", pWal->vgId, pWal->name, forceFsync);
if (fsync(pWal->fd) < 0) { if (fsync(pWal->fd) < 0) {
wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册