提交 9cbf1b1f 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

processedCount to prevent race condition

上级 a78c6232
...@@ -38,6 +38,8 @@ typedef struct { ...@@ -38,6 +38,8 @@ typedef struct {
typedef struct { typedef struct {
SRspRet rspRet; SRspRet rspRet;
int32_t processedCount;
int32_t code;
void *pCont; void *pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -187,13 +189,16 @@ void dnodeFreeVnodeWqueue(void *wqueue) { ...@@ -187,13 +189,16 @@ void dnodeFreeVnodeWqueue(void *wqueue) {
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) { void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
SWriteMsg *pWrite = (SWriteMsg *)param; SWriteMsg *pWrite = (SWriteMsg *)param;
if (code > 0) return; if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
if (count <= 1) return;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle, .handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp, .pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len, .contLen = pWrite->rspRet.len,
.code = code, .code = pWrite->code,
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
...@@ -239,7 +244,10 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -239,7 +244,10 @@ static void *dnodeProcessWriteQueue(void *param) {
} }
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
if (pWrite) pWrite->rpcMsg.code = code; if (pWrite) {
pWrite->rpcMsg.code = code;
if (code <= 0) pWrite->processedCount = 1;
}
} }
walFsync(vnodeGetWal(pVnode)); walFsync(vnodeGetWal(pVnode));
......
...@@ -69,8 +69,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { ...@@ -69,8 +69,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) { if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) {
if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) { if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]),
TABLE_TID(pTable), TABLE_UID(pTable)); TABLE_TID(pMeta->tables[tid]), TABLE_UID(pMeta->tables[tid]));
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; return TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
} else { } else {
tsdbError("vgId:%d table %s at tid %d uid %" PRIu64 tsdbError("vgId:%d table %s at tid %d uid %" PRIu64
...@@ -1295,4 +1295,4 @@ static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) { ...@@ -1295,4 +1295,4 @@ static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables); tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册