提交 a2e97d8e 编写于 作者: S Shengliang Guan

[TD-860]

上级 354f2327
...@@ -140,8 +140,6 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { ...@@ -140,8 +140,6 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) {
return; return;
} }
if (code > 0) return;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle, .handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rpcRsp.rsp, .pCont = pWrite->rpcRsp.rsp,
......
...@@ -244,12 +244,20 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { ...@@ -244,12 +244,20 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
FORCE_INLINE FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
SMnodeMsg *pMsg = param; if (code > 0) return;
if (pMsg) { assert(param);
sdbDebug("app:%p:%p, forward request is confirmed, result:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code));
SSdbOper * pOper = param;
SMnodeMsg *pMsg = pOper->pMsg;
if (pOper->cb != NULL) {
sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg);
pOper->retCode = (*pOper->cb)(pMsg, code);
} }
dnodeSendRpcMnodeWriteRsp(pMsg, code);
dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
taosFreeQitem(pOper);
} }
void sdbUpdateSync() { void sdbUpdateSync() {
...@@ -529,7 +537,6 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -529,7 +537,6 @@ static int sdbWrite(void *param, void *data, int type) {
return code; return code;
} }
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
void *mhandle = NULL; void *mhandle = NULL;
if (pOper != NULL) mhandle = pOper->pMsg; if (pOper != NULL) mhandle = pOper->pMsg;
...@@ -541,18 +548,19 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -541,18 +548,19 @@ static int sdbWrite(void *param, void *data, int type) {
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
return syncCode; return syncCode;
} else if (syncCode > 0) { } else if (syncCode > 0) {
sdbDebug("table:%s, forward request is sent, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
} else {} } else {
}
// from app, oper is created // from app, oper is created
if (pOper != NULL) { if (pOper != NULL) {
sdbDebug("table:%s, record from app is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
return syncCode; return syncCode;
} else { } else {
sdbDebug("table:%s, record from wal/fwd is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
} }
// from wal or forward msg, oper not created, should add into hash // from wal or forward msg, oper not created, should add into hash
...@@ -619,7 +627,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -619,7 +627,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -669,7 +677,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -669,7 +677,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -719,7 +727,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -719,7 +727,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -964,18 +972,14 @@ static void *sdbWorkerFp(void *param) { ...@@ -964,18 +972,14 @@ static void *sdbWorkerFp(void *param) {
continue; continue;
} }
if (pOper->cb != NULL) {
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i);
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
sdbDecRef(pOper->table, pOper->pObj); sdbDecRef(pOper->table, pOper->pObj);
sdbConfirmForward(NULL, pOper, pOper->retCode);
} else if (type == TAOS_QTYPE_FWD) { } else if (type == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
taosFreeQitem(item);
} else { } else {
taosFreeQitem(item);
} }
taosFreeQitem(item);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册