提交 6b0095e0 编写于 作者: S Shengliang Guan

[TD-901] sdb crash while drop dnode

上级 7fccb2f0
...@@ -491,18 +491,22 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) { ...@@ -491,18 +491,22 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
return TSDB_CODE_MND_DNODE_NOT_EXIST; return TSDB_CODE_MND_DNODE_NOT_EXIST;
} }
mnodeDecDnodeRef(pDnode);
if (strcmp(pDnode->dnodeEp, mnodeGetMnodeMasterEp()) == 0) { if (strcmp(pDnode->dnodeEp, mnodeGetMnodeMasterEp()) == 0) {
mError("dnode:%d, can't drop dnode:%s which is master", pDnode->dnodeId, ep); mError("dnode:%d, can't drop dnode:%s which is master", pDnode->dnodeId, ep);
mnodeDecDnodeRef(pDnode);
return TSDB_CODE_MND_NO_REMOVE_MASTER; return TSDB_CODE_MND_NO_REMOVE_MASTER;
} }
mInfo("dnode:%d, start to drop it", pDnode->dnodeId); mInfo("dnode:%d, start to drop it", pDnode->dnodeId);
#ifndef _SYNC #ifndef _SYNC
return mnodeDropDnode(pDnode, pMsg); int32_t code = mnodeDropDnode(pDnode, pMsg);
#else #else
return balanceDropDnode(pDnode); int32_t code = balanceDropDnode(pDnode);
#endif #endif
mnodeDecDnodeRef(pDnode);
return code;
} }
static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
......
...@@ -264,8 +264,12 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { ...@@ -264,8 +264,12 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
if (pOper->cb != NULL) { if (pOper->cb != NULL) {
pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode); pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode);
} }
dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
// if ahandle, means this func is called by sdb write
if (ahandle == NULL) {
sdbDecRef(pOper->table, pOper->pObj);
}
taosFreeQitem(pOper); taosFreeQitem(pOper);
} }
...@@ -581,16 +585,26 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -581,16 +585,26 @@ static int sdbWrite(void *param, void *data, int type) {
return sdbInsertHash(pTable, &oper); return sdbInsertHash(pTable, &oper);
} else if (action == SDB_ACTION_DELETE) { } else if (action == SDB_ACTION_DELETE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
assert(rowMeta != NULL && rowMeta->row != NULL); if (rowMeta != NULL && rowMeta->row != NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose delete action", pTable->tableName,
pHead->cont);
return TSDB_CODE_SUCCESS;
}
SSdbOper oper = {.table = pTable, .pObj = rowMeta->row}; SSdbOper oper = {.table = pTable, .pObj = rowMeta->row};
return sdbDeleteHash(pTable, &oper); return sdbDeleteHash(pTable, &oper);
} else if (action == SDB_ACTION_UPDATE) { } else if (action == SDB_ACTION_UPDATE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
assert(rowMeta != NULL && rowMeta->row != NULL); if (rowMeta != NULL && rowMeta->row != NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose update action", pTable->tableName,
pHead->cont);
return TSDB_CODE_SUCCESS;
}
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
code = (*pTable->decodeFp)(&oper); code = (*pTable->decodeFp)(&oper);
return sdbUpdateHash(pTable, &oper); return sdbUpdateHash(pTable, &oper);
} else { return TSDB_CODE_MND_INVALID_MSG_TYPE; } } else {
return TSDB_CODE_MND_INVALID_MSG_TYPE;
}
} }
int32_t sdbInsertRow(SSdbOper *pOper) { int32_t sdbInsertRow(SSdbOper *pOper) {
...@@ -663,14 +677,18 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -663,14 +677,18 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
return TSDB_CODE_MND_SDB_INVAID_META_ROW; return TSDB_CODE_MND_SDB_INVAID_META_ROW;
} }
sdbIncRef(pTable, pOper->pObj);
int32_t code = sdbDeleteHash(pTable, pOper); int32_t code = sdbDeleteHash(pTable, pOper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbError("table:%s, failed to delete from hash", pTable->tableName); sdbError("table:%s, failed to delete from hash", pTable->tableName);
sdbDecRef(pTable, pOper->pObj);
return code; return code;
} }
// just delete data from memory // just delete data from memory
if (pOper->type != SDB_OPER_GLOBAL) { if (pOper->type != SDB_OPER_GLOBAL) {
sdbDecRef(pTable, pOper->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -692,7 +710,6 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -692,7 +710,6 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
sdbIncRef(pNewOper->table, pNewOper->pObj);
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -968,10 +985,11 @@ static void *sdbWorkerFp(void *param) { ...@@ -968,10 +985,11 @@ static void *sdbWorkerFp(void *param) {
int32_t code = sdbWrite(pOper, pHead, type); int32_t code = sdbWrite(pOper, pHead, type);
if (code > 0) code = 0; if (code > 0) code = 0;
if (pOper) if (pOper) {
pOper->retCode = code; pOper->retCode = code;
else } else {
pHead->len = code; // hackway pHead->len = code; // hackway
}
} }
walFsync(tsSdbObj.wal); walFsync(tsSdbObj.wal);
...@@ -983,7 +1001,6 @@ static void *sdbWorkerFp(void *param) { ...@@ -983,7 +1001,6 @@ static void *sdbWorkerFp(void *param) {
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
sdbDecRef(pOper->table, pOper->pObj);
sdbConfirmForward(NULL, pOper, pOper->retCode); sdbConfirmForward(NULL, pOper, pOper->retCode);
} else if (type == TAOS_QTYPE_FWD) { } else if (type == TAOS_QTYPE_FWD) {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
......
...@@ -969,7 +969,7 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) { ...@@ -969,7 +969,7 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
sdbFreeIter(pIter); sdbFreeIter(pIter);
mInfo("dnode:%d, all vgroups is dropped from sdb", pDropDnode->dnodeId); mInfo("dnode:%d, all vgroups:%d is dropped from sdb", pDropDnode->dnodeId, numOfVgroups);
} }
#if 0 #if 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册