未验证 提交 9f599c23 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12656 from taosdata/fix/dnode

refactor: adjust msg logs
......@@ -63,7 +63,7 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
bmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
......
......@@ -86,7 +86,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
SRpcMsg rpcRsp = {0};
dTrace("send status msg to mnode, app:%p", rpcMsg.info.ahandle);
dTrace("send status msg to mnode");
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);
......
......@@ -160,7 +160,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
rpcSendResponse(&rsp);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......
......@@ -29,7 +29,7 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
dTrace("msg:%p, get from mnode queue", pMsg);
switch (pMsg->msgType) {
case TDMT_DND_ALTER_MNODE:
......@@ -51,7 +51,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
mmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -73,7 +73,7 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
}
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -84,13 +84,20 @@ static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
return 0;
}
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); }
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
}
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
}
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); }
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
}
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
}
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
......@@ -101,25 +108,25 @@ static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc)
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1;
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pRpc);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pMsg);
}
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pRpc);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pMsg);
}
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pRpc);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
}
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pRpc); }
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = {
......
......@@ -44,7 +44,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
qmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
......@@ -60,7 +60,7 @@ static void qmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
qmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -76,7 +76,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
qmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -105,7 +105,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SR
return -1;
}
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
......
......@@ -44,7 +44,7 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
smSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
......@@ -166,7 +166,7 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......@@ -174,7 +174,7 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......@@ -187,7 +187,7 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......@@ -195,7 +195,7 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......
......@@ -51,7 +51,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg);
dError("msg:%p, not processed in vnode queue", pMsg);
}
if (msgType & 1u) {
......@@ -59,7 +59,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
vmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -67,13 +67,13 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
dTrace("msg:%p, get from vnode-query queue", pMsg);
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -82,13 +82,13 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
dTrace("msg:%p, get from vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -96,7 +96,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg rsp;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) {
......@@ -108,7 +107,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SRpcMsg *pMsg = NULL;
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
dTrace("msg:%p, get from vnode-write queue", pMsg);
if (taosArrayPush(pArray, &pMsg) == NULL) {
dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
......@@ -116,21 +115,12 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SRpcMsg *pMsg;
SRpcMsg *pRpc;
pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
pRpc = pMsg;
rsp.info = pRpc->info;
rsp.pCont = NULL;
rsp.contLen = 0;
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
SRpcMsg rsp = {.info = pMsg->info, .pCont = NULL, .contLen = 0};
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false);
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
// rsp.code = TSDB_CODE_SYN_NOT_LEADER;
// tmsgSendRsp(&rsp);
dTrace("syncPropose not leader redirect, vgId:%d ", syncGetVgId(vnodeGetSyncHandle(pVnode->pImpl)));
dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId);
rsp.code = TSDB_CODE_RPC_REDIRECT;
SEpSet newEpSet;
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
......@@ -160,7 +150,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) {
......@@ -181,7 +171,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
// apply data into tsdb
if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
rsp.code = terrno;
dTrace("vnodeProcessWriteReq error, code:%d", terrno);
dTrace("msg:%p, process write error since %s", pMsg, terrstr());
}
syncApplyMsgDestroy(pSyncApplyMsg);
......@@ -215,7 +205,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
SRpcMsg rsp = {0};
rsp.code = terrno;
rsp.info = pMsg->info;
dTrace("vmProcessSyncQueue error, code:%d", terrno);
dTrace("msg:%p, process sync queue error since code:%s", pMsg, terrstr());
tmsgSendRsp(&rsp);
}
}
......@@ -227,18 +217,18 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
dTrace("msg:%p, get from vnode-merge queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -255,29 +245,29 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
dError("vgId:%d, failed to put msg:%p into vnode-queue since %s", pHead->vgId, pMsg, terrstr());
return terrno != 0 ? terrno : -1;
}
switch (qtype) {
case QUERY_QUEUE:
dTrace("msg:%p, type:%s will be written into vnode-query queue", pMsg, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pQueryQ, pMsg);
break;
case FETCH_QUEUE:
dTrace("msg:%p, type:%s will be written into vnode-fetch queue", pMsg, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case WRITE_QUEUE:
dTrace("msg:%p, type:%s will be written into vnode-write queue", pMsg, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pWriteQ, pMsg);
break;
case SYNC_QUEUE:
dTrace("msg:%p, type:%s will be written into vnode-sync queue", pMsg, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pSyncQ, pMsg);
break;
case MERGE_QUEUE:
dTrace("msg:%p, type:%s will be written into vnode-merge queue", pMsg, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pMergeQ, pMsg);
break;
default:
......@@ -312,7 +302,7 @@ int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
dTrace("msg:%p, put into vnode-mgmt worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......@@ -320,7 +310,7 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name);
dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......@@ -332,35 +322,33 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
if (pVnode == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
int32_t code = 0;
int32_t code = 0;
if (pMsg != NULL) {
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
// if (pMsg->handle != NULL) assert(pMsg->refId != 0);
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, will be put into vnode-write queue", pMsg);
dTrace("msg:%p, create and put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pWriteQ, pMsg);
break;
case QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
dTrace("msg:%p, create and put into vnode-query queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pQueryQ, pMsg);
break;
case FETCH_QUEUE:
dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
dTrace("msg:%p, create and put into vnode-fetch queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case APPLY_QUEUE:
dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
dTrace("msg:%p, create and put into vnode-apply queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pApplyQ, pMsg);
break;
case MERGE_QUEUE:
dTrace("msg:%p, will be put into vnode-merge queue", pMsg);
dTrace("msg:%p, create and put into vnode-merge queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pMergeQ, pMsg);
break;
case SYNC_QUEUE:
dTrace("msg:%p, will be put into vnode-sync queue", pMsg);
dTrace("msg:%p, create and put into vnode-sync queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pSyncQ, pMsg);
break;
default:
......
......@@ -274,25 +274,25 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus)
}
}
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("net test req is received");
SRpcMsg rsp = {.code = 0, .info = pReq->info};
rsp.pCont = rpcMallocCont(pReq->contLen);
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
dDebug("start to process net test req");
SRpcMsg rsp = {.code = 0, .info = pMsg->info};
rsp.pCont = rpcMallocCont(pMsg->contLen);
if (rsp.pCont == NULL) {
rsp.code = TSDB_CODE_OUT_OF_MEMORY;
} else {
rsp.contLen = pReq->contLen;
rsp.contLen = pMsg->contLen;
}
rpcSendResponse(&rsp);
}
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("server startup status req is received");
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
dDebug("start to process server startup status req");
SServerStatusRsp statusRsp = {0};
dmGetServerStartupStatus(pDnode, &statusRsp);
SRpcMsg rspMsg = {.info = pReq->info};
SRpcMsg rspMsg = {.info = pMsg->info};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
if (rspLen < 0) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -37,6 +37,7 @@ static int32_t dmCreateShm(SMgmtWrapper *pWrapper) {
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
return -1;
}
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize);
return 0;
}
......@@ -60,7 +61,7 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
taosIgnSignal(SIGCHLD);
pWrapper->proc.pid = pid;
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
dInfo("node:%s, continue running in new pid:%d", pWrapper->name, pid);
return 0;
}
......@@ -177,11 +178,11 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
if (OnlyInParentProc(pWrapper)) {
int32_t pid = pWrapper->proc.pid;
if (pid > 0 && taosProcExist(pid)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pid);
dInfo("node:%s, send kill signal to the child pid:%d", pWrapper->name, pid);
taosKillProc(pid);
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pid);
dInfo("node:%s, wait for child pid:%d to stop", pWrapper->name, pid);
taosWaitProc(pid);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pid);
dInfo("node:%s, child pid:%d is stopped", pWrapper->name, pid);
}
}
......@@ -255,7 +256,7 @@ static void dmWatchNodes(SDnode *pDnode) {
if (!OnlyInParentProc(pWrapper)) continue;
if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
dError("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
dError("node:%s, pid:%d is killed and needs to restart", pWrapper->name, proc->pid);
dmCloseProcRpcHandles(&pWrapper->proc);
dmNewProc(pWrapper, ntype);
}
......
......@@ -162,7 +162,7 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg
return 0;
}
static int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
static inline int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
tsem_wait(&queue->sem);
taosThreadMutexLock(&queue->mutex);
......@@ -412,7 +412,7 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
SProc *proc = &pWrapper->proc;
if (proc->name == NULL) return;
dDebug("node:%s, start to clean up proc", pWrapper->name);
dDebug("node:%s, start to cleanup proc", pWrapper->name);
dmStopProc(proc);
dmCleanupProcQueue(proc->cqueue);
dmCleanupProcQueue(proc->pqueue);
......
......@@ -43,8 +43,8 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
return -1;
}
dTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
pMsg->info.wrapper = pWrapper;
dTrace("msg:%p, will be processed by %s, handle:%p", pMsg, pWrapper->name, pMsg->info.handle);
return (*msgFp)(pWrapper->pMgmt, pMsg);
}
......@@ -56,8 +56,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
SMgmtWrapper *pWrapper = NULL;
dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
pRpc->info.handle, pRpc->pCont, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
pRpc->info.noResp = 0;
pRpc->info.persistHandle = 0;
pRpc->info.wrapper = NULL;
......
......@@ -842,7 +842,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
taosMemoryFree(pTrans->rpcRsp);
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage,
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage,
pTrans->rpcInfo.ahandle);
SRpcMsg rspMsg = {
.info = pTrans->rpcInfo,
......@@ -899,7 +899,7 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
}
}
mDebug("trans:%d, action:%d response is received, code:0x%04x, accept:0x%04x", transId, action, pRsp->code,
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%04x", transId, action, pRsp->code,
pAction->acceptableCode);
mndTransExecute(pMnode, pTrans);
......@@ -1031,7 +1031,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0;
} else {
mError("trans:%d, all %d actions executed, code:0x%04x", pTrans->id, numOfActions, errCode & 0XFFFF);
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF);
mndTransResetActions(pMnode, pTrans, pArray);
terrno = errCode;
return errCode;
......@@ -1222,7 +1222,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
}
mDebug("trans:%d, finished, code:0x%04x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
return continueExec;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册