提交 6d67b171 编写于 作者: L Liu Jicong

enh(wal): add lock to guarantee read behaviour

上级 f24f2eff
...@@ -684,6 +684,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -684,6 +684,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
/*SMeta* pMeta = pTq->pVnode->pMeta;*/
/*tdbTbUpsert(pMeta->pTaskIdx, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn);*/
return 0; return 0;
FAIL: FAIL:
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
......
...@@ -2453,13 +2453,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2453,13 +2453,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "diff", .name = "diff",
.type = FUNCTION_TYPE_DIFF, .type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.translateFunc = translateDiff, .translateFunc = translateDiff,
.getEnvFunc = getDiffFuncEnv, .getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup, .initFunc = diffFunctionSetup,
.processFunc = diffFunction, .processFunc = diffFunction,
.sprocessFunc = diffScalarFunction, .sprocessFunc = diffScalarFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize,
.estimateReturnRowsFunc = diffEstReturnRows,
}, },
{ {
.name = "statecount", .name = "statecount",
......
...@@ -441,9 +441,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -441,9 +441,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
return -1; return -1;
} }
taosThreadMutexLock(&pReader->mutex);
if (pReader->curInvalid || pReader->curVersion != ver) { if (pReader->curInvalid || pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) { if (walReadSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
seeked = true; seeked = true;
...@@ -464,6 +467,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -464,6 +467,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
} }
...@@ -473,6 +477,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -473,6 +477,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
ver); ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
...@@ -480,6 +485,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -480,6 +485,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen); void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
pReader->pHead = ptr; pReader->pHead = ptr;
...@@ -494,6 +500,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -494,6 +500,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
} }
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
...@@ -503,6 +510,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -503,6 +510,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
pReader->curInvalid = 1; pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
...@@ -516,9 +524,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -516,9 +524,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
pReader->curInvalid = 1; pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
pReader->curVersion++; pReader->curVersion++;
taosThreadMutexUnlock(&pReader->mutex);
return 0; return 0;
} }
...@@ -408,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -408,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
pWal->writeHead.head.ingestTs = taosGetTimestampMs(); pWal->writeHead.head.ingestTs = 0;
// sync info for sync module // sync info for sync module
pWal->writeHead.head.syncMeta = syncMeta; pWal->writeHead.head.syncMeta = syncMeta;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册