提交 59d8eda4 编写于 作者: S Shengliang Guan

TD-10431 fix invalid write errors

上级 d27efd6a
......@@ -289,7 +289,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
pMgmt->pMnode = NULL;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 1) taosMsleep(10);
......@@ -920,6 +919,7 @@ void dndCleanupMnode(SDnode *pDnode) {
dndStopMnodeWorker(pDnode);
dndCleanupMnodeMgmtWorker(pDnode);
tfree(pMgmt->file);
mndClose(pMgmt->pMnode);
dInfo("dnode-mnode is cleaned up");
}
......
......@@ -131,17 +131,18 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
rpcFreeCont(pMsg->pCont);
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
dTrace("RPC %p, rsp:%s app:%p will be processed, result:%s", pMsg->handle, taosMsg[msgType], pMsg->ahandle,
tstrerror(pMsg->code));
(*fp)(pDnode, pMsg, pEpSet);
} else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
rpcFreeCont(pMsg->pCont);
}
}
......@@ -187,19 +188,19 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dTrace("RPC %p, network test req will be processed", pMsg->handle);
dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle);
dndProcessDnodeReq(pDnode, pMsg, pEpSet);
return;
}
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("RPC %p, req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
} else if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
dError("RPC %p, req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
......@@ -207,7 +208,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
if (pMsg->pCont == NULL) {
dTrace("RPC %p, req:%s not processed since content is null", pMsg->handle, taosMsg[msgType]);
dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
rpcSendResponse(&rspMsg);
return;
......@@ -215,10 +216,10 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, req:%s will be processed", pMsg->handle, taosMsg[msgType]);
dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
(*fp)(pDnode, pMsg, pEpSet);
} else {
dError("RPC %p, req:%s is not processed", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
......
......@@ -895,6 +895,7 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode mgmt worker is initialized");
return 0;
}
......@@ -903,6 +904,7 @@ static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) {
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
tWorkerCleanup(&pMgmt->mgmtPool);
pMgmt->pMgmtQ = NULL;
dDebug("vnode mgmt worker is closed");
}
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) {
......@@ -963,6 +965,7 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode read worker is initialized");
return 0;
}
......@@ -970,6 +973,7 @@ static void dndCleanupVnodeReadWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerCleanup(&pMgmt->fetchPool);
tWorkerCleanup(&pMgmt->queryPool);
dDebug("vnode close worker is initialized");
}
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) {
......@@ -1016,12 +1020,14 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode write worker is initialized");
return 0;
}
static void dndCleanupVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerCleanup(&pMgmt->writePool);
dDebug("vnode write worker is closed");
}
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
......@@ -1046,7 +1052,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
if (maxThreads < 1) maxThreads = 1;
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->writePool;
SMWorkerPool *pPool = &pMgmt->syncPool;
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
......@@ -1054,12 +1060,14 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode sync worker is initialized");
return 0;
}
static void dndCleanupVnodeSyncWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode sync worker is closed");
}
int32_t dndInitVnodes(SDnode *pDnode) {
......
......@@ -178,11 +178,11 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
if (pMnode->pSteps == NULL) return;
if (pos == -1) {
pos = taosArrayGetSize(pMnode->pSteps);
pos = taosArrayGetSize(pMnode->pSteps) - 1;
}
for (int32_t s = pos; s >= 0; s--) {
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
mDebug("step:%s will cleanup", pStep->name);
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)(pMnode);
......@@ -267,7 +267,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
}
int32_t code = mnodeCreateDir(pMnode, path);
if (mnodeCreateDir(pMnode, path) != 0) {
if (code != 0) {
mError("failed to open mnode since %s", tstrerror(code));
mndClose(pMnode);
terrno = code;
......@@ -306,6 +306,10 @@ void mndClose(SMnode *pMnode) {
if (pMnode != NULL) {
mDebug("start to close mnode");
mndCleanupSteps(pMnode, -1);
if (pMnode->pSteps != NULL) {
taosArrayDestroy(pMnode->pSteps);
pMnode->pSteps = NULL;
}
tfree(pMnode->path);
tfree(pMnode->charset);
tfree(pMnode->locale);
......@@ -354,7 +358,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
mndCleanupMsg(pMsg);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("failed to create msg since %s",terrstr());
mError("failed to create msg since %s", terrstr());
return NULL;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
......@@ -372,7 +376,10 @@ void mndCleanupMsg(SMnodeMsg *pMsg) {
mTrace("msg:%p, is destroyed", pMsg);
}
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {}
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp);
}
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
......@@ -381,32 +388,34 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType % 2 == 1);
mTrace("msg:%p, type:%s will be processed", pMsg, taosMsg[msgType]);
mTrace("msg:%p, app:%p will be processed", pMsg, ahandle);
if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, failed to process since %s", pMsg, terrstr());
mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
goto PROCESS_RPC_END;
}
if (isReq && pMsg->rpcMsg.pCont == NULL) {
code = TSDB_CODE_MND_INVALID_MSG_LEN;
mError("msg:%p, failed to process since %s", pMsg, terrstr());
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
goto PROCESS_RPC_END;
}
MndMsgFp fp = pMnode->msgFp[msgType];
if (fp == NULL) {
code = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, failed to process since not handle", pMsg);
mError("msg:%p, app:%p failed to process since not handle", pMsg, ahandle);
goto PROCESS_RPC_END;
}
code = (*fp)(pMnode, pMsg);
if (code != 0) {
code = terrno;
mError("msg:%p, failed to process since %s", pMsg, terrstr());
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
goto PROCESS_RPC_END;
} else {
mTrace("msg:%p, app:%p is processed", pMsg, ahandle);
}
PROCESS_RPC_END:
......@@ -421,8 +430,6 @@ PROCESS_RPC_END:
rpcSendResponse(&rpcRsp);
}
}
mndCleanupMsg(pMsg);
}
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) {
......
......@@ -40,7 +40,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
return NULL;
}
for (int32_t i = 0; i < SDB_MAX; ++i) {
for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]);
}
......@@ -69,12 +69,30 @@ void sdbCleanup(SSdb *pSdb) {
tfree(pSdb->tmpDir);
}
for (int32_t i = 0; i < SDB_MAX; ++i) {
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash != NULL) {
taosHashClear(hash);
taosHashCleanup(hash);
if (hash == NULL) continue;
SdbDeleteFp deleteFp = pSdb->deleteFps[i];
SSdbRow **ppRow = taosHashIterate(hash, ppRow);
while (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow == NULL) continue;
if (deleteFp != NULL) {
(*deleteFp)(pSdb, pRow->pObj);
}
sdbFreeRow(pRow);
ppRow = taosHashIterate(hash, ppRow);
}
}
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash == NULL) continue;
taosHashClear(hash);
taosHashCleanup(hash);
pSdb->hashObjs[i] = NULL;
}
......@@ -91,7 +109,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->encodeFps[sdb] = table.encodeFp;
pSdb->decodeFps[sdb] = table.decodeFp;
for (int32_t i = 0; i < SDB_MAX; ++i) {
for (ESdbType i = 0; i < SDB_MAX; ++i) {
int32_t type;
if (pSdb->keyTypes[i] == SDB_KEY_INT32) {
type = TSDB_DATA_TYPE_INT;
......
......@@ -42,7 +42,7 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
static int32_t sdbRunDeployFp(SSdb *pSdb) {
mDebug("start to deploy sdb");
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue;
......@@ -150,7 +150,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
return -1;
}
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue;
......@@ -173,6 +173,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
code = TAOS_SYSTEM_ERROR(terrno);
taosHashCancelIterate(hash, ppRow);
free(pRaw);
break;
}
......@@ -180,6 +181,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
code = TAOS_SYSTEM_ERROR(terrno);
taosHashCancelIterate(hash, ppRow);
free(pRaw);
break;
}
} else {
......@@ -188,6 +190,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
break;
}
free(pRaw);
ppRow = taosHashIterate(hash, ppRow);
}
taosWUnLockLatch(pLock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册