From 6d67b171138e4cf0a90c4301a73bbe7b9672dd2f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 30 Jul 2022 16:04:06 +0800 Subject: [PATCH] enh(wal): add lock to guarantee read behaviour --- source/dnode/vnode/src/tq/tq.c | 3 +++ source/libs/function/src/builtins.c | 5 +++-- source/libs/wal/src/walRead.c | 11 +++++++++++ source/libs/wal/src/walWrite.c | 2 +- 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 364ecbab61..86b016b8be 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -684,6 +684,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + /*SMeta* pMeta = pTq->pVnode->pMeta;*/ + /*tdbTbUpsert(pMeta->pTaskIdx, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn);*/ + return 0; FAIL: if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index c9f462bae1..83ac7438fe 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2453,13 +2453,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "diff", .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, .processFunc = diffFunction, .sprocessFunc = diffScalarFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = functionFinalize, + .estimateReturnRowsFunc = diffEstReturnRows, }, { .name = "statecount", diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 787c9af317..9be648b518 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -441,9 +441,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { return -1; } + taosThreadMutexLock(&pReader->mutex); + if (pReader->curInvalid || pReader->curVersion != ver) { if (walReadSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); + taosThreadMutexUnlock(&pReader->mutex); return -1; } seeked = true; @@ -464,6 +467,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } ASSERT(0); + taosThreadMutexUnlock(&pReader->mutex); return -1; } } @@ -473,6 +477,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -480,6 +485,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; + taosThreadMutexUnlock(&pReader->mutex); return -1; } pReader->pHead = ptr; @@ -494,6 +500,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); } + taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -503,6 +510,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); + taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -516,9 +524,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); + taosThreadMutexUnlock(&pReader->mutex); return -1; } pReader->curVersion++; + taosThreadMutexUnlock(&pReader->mutex); + return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 4eadc92f70..68d8c54e28 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -408,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.head.version = index; pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; - pWal->writeHead.head.ingestTs = taosGetTimestampMs(); + pWal->writeHead.head.ingestTs = 0; // sync info for sync module pWal->writeHead.head.syncMeta = syncMeta; -- GitLab