From 9cbf1b1f7e6b3b47e575fbb52cc5be69a838b8e2 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 29 Jul 2020 09:56:04 +0000 Subject: [PATCH] processedCount to prevent race condition --- src/dnode/src/dnodeVWrite.c | 14 +++++++++++--- src/tsdb/src/tsdbMeta.c | 6 +++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index dc09a03e14..51bc8890fc 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -38,6 +38,8 @@ typedef struct { typedef struct { SRspRet rspRet; + int32_t processedCount; + int32_t code; void *pCont; int32_t contLen; SRpcMsg rpcMsg; @@ -187,13 +189,16 @@ void dnodeFreeVnodeWqueue(void *wqueue) { void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) { SWriteMsg *pWrite = (SWriteMsg *)param; - if (code > 0) return; + if (code < 0) pWrite->code = code; + int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); + + if (count <= 1) return; SRpcMsg rpcRsp = { .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rspRet.rsp, .contLen = pWrite->rspRet.len, - .code = code, + .code = pWrite->code, }; rpcSendResponse(&rpcRsp); @@ -239,7 +244,10 @@ static void *dnodeProcessWriteQueue(void *param) { } int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); - if (pWrite) pWrite->rpcMsg.code = code; + if (pWrite) { + pWrite->rpcMsg.code = code; + if (code <= 0) pWrite->processedCount = 1; + } } walFsync(vnodeGetWal(pVnode)); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 09bbbd8f4d..ddb935b7e5 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -69,8 +69,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) { if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) { - tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - TABLE_TID(pTable), TABLE_UID(pTable)); + tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]), + TABLE_TID(pMeta->tables[tid]), TABLE_UID(pMeta->tables[tid])); return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; } else { tsdbError("vgId:%d table %s at tid %d uid %" PRIu64 @@ -1295,4 +1295,4 @@ static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) { tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables); return 0; -} \ No newline at end of file +} -- GitLab