diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b2e1da0692ee7d54abd47fd44f0bafb0d7303971..a89808852d7a9707925f1d0c7e06bf977d72350c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -40,36 +40,36 @@ int32_t* taosGetErrno(); #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error //common & util -#define TSDB_CODE_RSP_IN_APP TAOS_DEF_ERROR_CODE(0, 0x0001) -#define TSDB_CODE_CLEAN_AND_RSP_IN_APP TAOS_DEF_ERROR_CODE(0, 0x0002) -#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x000A) -#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000B) -#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0011) +#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) +#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0002) +#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0003) +#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0010) #define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0011) #define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0012) #define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0013) -#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0015) -#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0016) +#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0014) +#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0016) -#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0014) -#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0017) -#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0018) -#define TSDB_CODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0019) -#define TSDB_CODE_INVALID_JSON_FORMAT TAOS_DEF_ERROR_CODE(0, 0x001A) -#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x001B) -#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x001C) -#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x001D) -#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x001E) -#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x001F) -#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0020) -#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0021) -#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0022) -#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0023) -#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0024) -#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0025) -#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0026) -#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0027) -#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0028) +#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0017) +#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0018) +#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0019) +#define TSDB_CODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x001A) +#define TSDB_CODE_INVALID_JSON_FORMAT TAOS_DEF_ERROR_CODE(0, 0x001B) +#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x001C) +#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x001D) +#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x001E) +#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x001F) +#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0020) +#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0021) +#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0022) +#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0023) +#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0024) +#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0025) +#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0026) +#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0027) +#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028) +#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) + #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) #define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0042) diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 52711af4f8a7c3b7f86f97001ea00b81725bf3e4..2b3ac7ae73196d36578976d8049212c8f86dc764 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -135,8 +135,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper); void dmReleaseWrapper(SMgmtWrapper *pWrapper); SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); -int32_t dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); -int32_t dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); +void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); +void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); +void dmProcessFetchRsp(SRpcMsg *pMsg); // dmNodes.c int32_t dmOpenNode(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index cf27d89d087e367d6a245998a3062d9cf5bca6b1..c83550c7b1e6b198e6c1647784bd70afa72d15b5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "dmNodes.h" +#include "qworker.h" static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); @@ -277,42 +278,44 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) } } -int32_t dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) { +void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) { dDebug("msg:%p, net test req will be processed", pMsg); - SRpcMsg rsp = {.code = 0, .info = pMsg->info}; + + SRpcMsg rsp = {.info = pMsg->info}; rsp.pCont = rpcMallocCont(pMsg->contLen); if (rsp.pCont == NULL) { rsp.code = TSDB_CODE_OUT_OF_MEMORY; } else { rsp.contLen = pMsg->contLen; } + rpcSendResponse(&rsp); - return TSDB_CODE_RSP_IN_APP; + rpcFreeCont(pMsg->pCont); } -int32_t dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) { +void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) { dDebug("msg:%p, server startup status req will be processed", pMsg); + SServerStatusRsp statusRsp = {0}; dmGetServerStartupStatus(pDnode, &statusRsp); - SRpcMsg rspMsg = {.info = pMsg->info}; - int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); - if (rspLen < 0) { - rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; + SRpcMsg rsp = {.info = pMsg->info}; + int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); + if (contLen < 0) { + rsp.code = TSDB_CODE_OUT_OF_MEMORY; + } else { + rsp.pCont = rpcMallocCont(contLen); + if (rsp.pCont != NULL) { + tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp); + rsp.contLen = contLen; + } } - tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp); - rspMsg.pCont = pRsp; - rspMsg.contLen = rspLen; - -_OVER: - rpcSendResponse(&rspMsg); - return TSDB_CODE_RSP_IN_APP; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); } + +void dmProcessFetchRsp(SRpcMsg *pMsg) { + qWorkerProcessFetchRsp(NULL, NULL, pMsg); + // rpcFreeCont(pMsg->pCont); +} \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index d96d77a5a33ecb02c6da5ba913065c515dd58fc2..3dc0572927812f3283b2fcc091f8d5d3a4994e26 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -15,7 +15,6 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -#include "qworker.h" static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; @@ -45,49 +44,42 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans * pTrans = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg * pMsg = NULL; - bool needRelease = false; - SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; + SRpcMsg *pMsg = NULL; SMgmtWrapper *pWrapper = NULL; + SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); - pRpc->info.noResp = 0; - pRpc->info.persistHandle = 0; - pRpc->info.wrapper = NULL; - pRpc->info.node = NULL; - pRpc->info.rsp = NULL; - pRpc->info.rspLen = 0; if (pRpc->msgType == TDMT_DND_NET_TEST) { dmProcessNetTestReq(pDnode, pRpc); - goto _OVER_JUST_FREE; + return; } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { - qWorkerProcessFetchRsp(NULL, NULL, pRpc); - goto _OVER_JUST_FREE; + dmProcessFetchRsp(pRpc); + return; } else { } if (pDnode->status != DND_STAT_RUNNING) { if (pRpc->msgType == TDMT_DND_SERVER_STATUS) { dmProcessServerStartupStatus(pDnode, pRpc); - goto _OVER_JUST_FREE; + return; } else { terrno = TSDB_CODE_APP_NOT_READY; - goto _OVER_RSP_FREE; + goto _OVER; } } if (IsReq(pRpc) && pRpc->pCont == NULL) { terrno = TSDB_CODE_INVALID_MSG_LEN; - goto _OVER_RSP_FREE; + goto _OVER; } if (pHandle->defaultNtype == NODE_END) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; - goto _OVER_RSP_FREE; + goto _OVER; } else { pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; if (pHandle->needCheckVgId) { @@ -102,15 +94,15 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } } else { terrno = TSDB_CODE_INVALID_MSG_LEN; - goto _OVER_RSP_FREE; + goto _OVER; } } } if (dmMarkWrapper(pWrapper) != 0) { - goto _OVER_RSP_FREE; + pWrapper = NULL; + goto _OVER; } else { - needRelease = true; pRpc->info.wrapper = pWrapper; } @@ -147,7 +139,11 @@ _OVER: } } SRpcMsg rspMsg = {.code = code, .info = pRpc->info}; - tmsgSendRsp(&rspMsg); + if (pWrapper != NULL) { + tmsgSendRsp(&rspMsg); + } else { + rpcSendResponse(&rspMsg); + } } dTrace("msg:%p, is freed", pMsg); @@ -155,19 +151,7 @@ _OVER: rpcFreeCont(pRpc->pCont); } - if (needRelease) { - dmReleaseWrapper(pWrapper); - } - return; - -_OVER_JUST_FREE: - rpcFreeCont(pRpc->pCont); - return; - -_OVER_RSP_FREE: - rpcFreeCont(pRpc->pCont); - SRpcMsg simpleRsp = {.code = terrno, .info = pRpc->info}; - rpcSendResponse(&simpleRsp); + dmReleaseWrapper(pWrapper); } int32_t dmInitMsgHandle(SDnode *pDnode) { @@ -175,11 +159,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray * pArray = (*pWrapper->func.getHandlesFp)(); + SArray *pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle * pMgmt = taosArrayGet(pArray, i); + SMgmtHandle *pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 64052d2634fd0991e9a3f917d5d5e96c1a7810cd..f84775f0d7e7b932b60255f05820bf0df99f81e9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -46,10 +46,9 @@ STaosError errors[] = { #endif //common & util -TAOS_DEFINE_ERROR(TSDB_CODE_RSP_IN_APP, "Cleanup in App") -TAOS_DEFINE_ERROR(TSDB_CODE_CLEAN_AND_RSP_IN_APP, "Cleanup and rsp in App") +TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, "Action in progress") +TAOS_DEFINE_ERROR(TSDB_CODE_APP_ERROR, "Unexpected generic error") TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") -TAOS_DEFINE_ERROR(TSDB_CODE_APP_ERROR, "Database internal error") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_SHM_MEM, "Out of Shared memory")