提交 c243b314 编写于 作者: S Shengliang Guan

refactor: add gtid to log

上级 e87baa8d
...@@ -73,6 +73,7 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -73,6 +73,7 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
SDCreateMnodeReq createReq = {0}; SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -81,7 +82,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -81,7 +82,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
if (createReq.replica != 1) { if (createReq.replica != 1) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dGError("failed to create mnode since %s", terrstr());
return -1; return -1;
} }
...@@ -91,7 +92,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -91,7 +92,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
mgmt.path = pInput->path; mgmt.path = pInput->path;
mgmt.name = pInput->name; mgmt.name = pInput->name;
if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) { if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dGError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -99,7 +100,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -99,7 +100,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SDDropMnodeReq dropReq = {0}; const STraceId *trace = &pMsg->info.traceId;
SDDropMnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
...@@ -107,7 +109,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -107,7 +109,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) { if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr()); dGError("failed to drop mnode since %s", terrstr());
return -1; return -1;
} }
...@@ -117,7 +119,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -117,7 +119,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
mgmt.path = pInput->path; mgmt.path = pInput->path;
mgmt.name = pInput->name; mgmt.name = pInput->name;
if (mmWriteFile(&mgmt, NULL, deployed) != 0) { if (mmWriteFile(&mgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dGError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock); taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) { if (pMgmt->stopped) {
code = -1; code = -1;
...@@ -48,7 +47,8 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -48,7 +47,8 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
STraceId * trace = &pMsg->info.traceId;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, get from mnode queue", pMsg); dGTrace("msg:%p, get from mnode queue", pMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
...@@ -72,7 +72,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -72,7 +72,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
mndPostProcessQueryMsg(pMsg); mndPostProcessQueryMsg(pMsg);
} }
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
...@@ -80,7 +80,9 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -80,7 +80,9 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
dTrace("msg:%p, get from mnode-sync queue", pMsg);
const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, get from mnode-sync queue", pMsg);
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
pHead->contLen = ntohl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
...@@ -88,20 +90,22 @@ static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -88,20 +90,22 @@ static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = mndProcessSyncMsg(pMsg); int32_t code = mndProcessSyncMsg(pMsg);
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) { static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
if (mmAcquire(pMgmt) == 0) { if (mmAcquire(pMgmt) == 0) {
dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
mmRelease(pMgmt); mmRelease(pMgmt);
return 0; return 0;
} else { } else {
dTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(), dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(),
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
return -1; return -1;
} }
} }
...@@ -121,19 +125,17 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -121,19 +125,17 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
if (mndPreProcessQueryMsg(pMsg) != 0) { if (mndPreProcessQueryMsg(pMsg) != 0) {
dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); const STraceId *trace = &pMsg->info.traceId;
dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
return -1; return -1;
} }
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
} }
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pMsg->info.node = pMgmt->pMnode;
return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
} }
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg);
} }
......
...@@ -17,9 +17,44 @@ ...@@ -17,9 +17,44 @@
#include "dmMgmt.h" #include "dmMgmt.h"
#include "qworker.h" #include "qworker.h"
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); static inline void dmSendRsp(SRpcMsg *pMsg) {
static void dmSendRsp(SRpcMsg *pMsg); SMgmtWrapper *pWrapper = pMsg->info.wrapper;
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
rpcSendResponse(pMsg);
}
}
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->pCont = rpcMallocCont(contLen);
if (pMsg->pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
pMsg->contLen = contLen;
}
}
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
...@@ -28,31 +63,38 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { ...@@ -28,31 +63,38 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
return -1; return -1;
} }
dTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name); const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
pMsg->info.wrapper = pWrapper; pMsg->info.wrapper = pWrapper;
return (*msgFp)(pWrapper->pMgmt, pMsg); return (*msgFp)(pWrapper->pMgmt, pMsg);
} }
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans * pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1; int32_t code = -1;
SRpcMsg * pMsg = NULL; SRpcMsg *pMsg = NULL;
SMgmtWrapper *pWrapper = NULL; SMgmtWrapper *pWrapper = NULL;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
STraceId *trace = &pRpc->info.traceId; const STraceId *trace = &pRpc->info.traceId;
dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
if (pRpc->msgType == TDMT_DND_NET_TEST) { switch (pRpc->msgType) {
dmProcessNetTestReq(pDnode, pRpc); case TDMT_DND_NET_TEST:
return; dmProcessNetTestReq(pDnode, pRpc);
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { return;
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
return; case TDMT_VND_FETCH_RSP:
} else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) { qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
dmSetMnodeEpSet(&pDnode->data, pEpSet); return;
} else { case TDMT_MND_STATUS_RSP:
if (pEpSet != NULL) {
dmSetMnodeEpSet(&pDnode->data, pEpSet);
}
break;
default:
break;
} }
if (pDnode->status != DND_STAT_RUNNING) { if (pDnode->status != DND_STAT_RUNNING) {
...@@ -73,39 +115,43 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -73,39 +115,43 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pHandle->defaultNtype == NODE_END) { if (pHandle->defaultNtype == NODE_END) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
goto _OVER; goto _OVER;
} else { }
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
if (pHandle->needCheckVgId) { pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
if (pRpc->contLen > 0) { if (pHandle->needCheckVgId) {
SMsgHead *pHead = pRpc->pCont; if (pRpc->contLen > 0) {
int32_t vgId = ntohl(pHead->vgId); const SMsgHead *pHead = pRpc->pCont;
if (vgId == QNODE_HANDLE) { const int32_t vgId = ntohl(pHead->vgId);
switch (vgId) {
case QNODE_HANDLE:
pWrapper = &pDnode->wrappers[QNODE]; pWrapper = &pDnode->wrappers[QNODE];
} else if (vgId == SNODE_HANDLE) { break;
case SNODE_HANDLE:
pWrapper = &pDnode->wrappers[SNODE]; pWrapper = &pDnode->wrappers[SNODE];
} else if (vgId == MNODE_HANDLE) { break;
case MNODE_HANDLE:
pWrapper = &pDnode->wrappers[MNODE]; pWrapper = &pDnode->wrappers[MNODE];
} else { break;
} default:
} else { break;
terrno = TSDB_CODE_INVALID_MSG_LEN;
goto _OVER;
} }
} else {
terrno = TSDB_CODE_INVALID_MSG_LEN;
goto _OVER;
} }
} }
if (dmMarkWrapper(pWrapper) != 0) { if (dmMarkWrapper(pWrapper) != 0) {
pWrapper = NULL; pWrapper = NULL;
goto _OVER; goto _OVER;
} else {
pRpc->info.wrapper = pWrapper;
} }
pRpc->info.wrapper = pWrapper;
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) goto _OVER; if (pMsg == NULL) goto _OVER;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
if (InParentProc(pWrapper)) { if (InParentProc(pWrapper)) {
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ); code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
...@@ -115,13 +161,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -115,13 +161,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
dTrace("failed to process msg:%p since %s, handle:%p", pMsg, terrstr(), pRpc->info.handle);
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGTrace("msg:%p, failed to process since %s", pMsg, terrstr());
if (IsReq(pRpc)) { if (IsReq(pRpc)) {
SRpcMsg rsp = {.code = code, .info = pRpc->info}; SRpcMsg rsp = {.code = code, .info = pRpc->info};
if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG && if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
pRpc->msgType < TDMT_VND_MSG) { pRpc->msgType < TDMT_VND_MSG) {
dmBuildMnodeRedirectRsp(pDnode, &rsp); dmBuildMnodeRedirectRsp(pDnode, &rsp);
...@@ -135,7 +179,7 @@ _OVER: ...@@ -135,7 +179,7 @@ _OVER:
} }
if (pMsg != NULL) { if (pMsg != NULL) {
dTrace("msg:%p, is freed", pMsg); dGTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
...@@ -149,11 +193,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -149,11 +193,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SArray * pArray = (*pWrapper->func.getHandlesFp)(); SArray *pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1; if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle * pMgmt = taosArrayGet(pArray, i); SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) { if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId; pHandle->needCheckVgId = pMgmt->needCheckVgId;
...@@ -184,45 +228,6 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -184,45 +228,6 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
} }
} }
static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
rpcSendResponse(pMsg);
}
}
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->pCont = rpcMallocCont(contLen);
if (pMsg->pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
pMsg->contLen = contLen;
}
}
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) { if (InChildProc(pWrapper)) {
......
...@@ -40,18 +40,27 @@ ...@@ -40,18 +40,27 @@
#include "wal.h" #include "wal.h"
#include "libs/function/function.h" #include "libs/function/function.h"
// clang-format off
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} // clang-format off
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} #define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",GTID: %s", __VA_ARGS__, buf);} while(0) #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#define dGFatal(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define dGError(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define dGWarn(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define dGInfo(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define dGDebug(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
// clang-format on
typedef enum { typedef enum {
DNODE = 0, DNODE = 0,
...@@ -185,4 +194,3 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); ...@@ -185,4 +194,3 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
#endif #endif
#endif /*_TD_DM_INT_H_*/ #endif /*_TD_DM_INT_H_*/
// clang-format on
...@@ -34,13 +34,20 @@ extern "C" { ...@@ -34,13 +34,20 @@ extern "C" {
#endif #endif
// clang-format off // clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} #define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} #define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0) #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
#define mGFatal(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define mGError(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define mGWarn(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define mGInfo(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define mGDebug(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
// clang-format on // clang-format on
......
...@@ -59,22 +59,28 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { ...@@ -59,22 +59,28 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
static void mndPullupTrans(SMnode *pMnode) { static void mndPullupTrans(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; if (pReq != NULL) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; if (pReq != NULL) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
} }
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; if (pReq != NULL) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
} }
static void mndPushTtlTime(SMnode *pMnode) { static void mndPushTtlTime(SMnode *pMnode) {
...@@ -89,10 +95,11 @@ static void mndPushTtlTime(SMnode *pMnode) { ...@@ -89,10 +95,11 @@ static void mndPushTtlTime(SMnode *pMnode) {
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t); int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
SMsgHead *pHead = rpcMallocCont(contLen); SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) { if (pHead == NULL) {
mError("ttl time malloc err. contLen:%d", contLen); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
pHead->contLen = htonl(contLen); pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
...@@ -100,13 +107,13 @@ static void mndPushTtlTime(SMnode *pMnode) { ...@@ -100,13 +107,13 @@ static void mndPushTtlTime(SMnode *pMnode) {
*(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t); *(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg); int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) { if (code != 0) {
mError("ttl time seed err. code:%d", code); mError("failed to send ttl time seed msg, code:0x%x", code);
} else {
mInfo("send ttl time seed msg, time:%d", t);
} }
mError("ttl time seed succ. time:%d", t);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
} }
...@@ -117,7 +124,7 @@ static void *mndThreadFp(void *param) { ...@@ -117,7 +124,7 @@ static void *mndThreadFp(void *param) {
setThreadName("mnode-timer"); setThreadName("mnode-timer");
while (1) { while (1) {
if (lastTime % (864000) == 0) { // sleep 1 day for ttl if (lastTime % 864000 == 0) {
mndPushTtlTime(pMnode); mndPushTtlTime(pMnode);
} }
...@@ -549,23 +556,25 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -549,23 +556,25 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER) {
return -1;
}
if (pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && const STraceId *trace = &pMsg->info.traceId;
pMsg->msgType != TDMT_MND_TRANS_TIMER) { mGError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
SEpSet epSet = {0}; SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet); mndGetMnodeEpSet(pMsg->info.node, &epSet);
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen); pMsg->info.rsp = rpcMallocCont(contLen);
if (pMsg->info.rsp != NULL) { if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.rspLen = contLen; pMsg->info.rspLen = contLen;
terrno = TSDB_CODE_RPC_REDIRECT; terrno = TSDB_CODE_RPC_REDIRECT;
} else { } else {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
}
} }
return -1; return -1;
...@@ -575,17 +584,20 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { ...@@ -575,17 +584,20 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, const STraceId *trace = &pMsg->info.traceId;
mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN; terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1; return -1;
} }
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
const STraceId *trace = &pMsg->info.traceId;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
if (fp == NULL) { if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1; return -1;
} }
...@@ -593,18 +605,17 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { ...@@ -593,18 +605,17 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
if (mndCheckMsgContent(pMsg) != 0) return -1; if (mndCheckMsgContent(pMsg) != 0) return -1;
if (mndCheckMnodeState(pMsg) != 0) return -1; if (mndCheckMnodeState(pMsg) != 0) return -1;
STraceId *trace = &pMsg->info.traceId;
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
int32_t code = (*fp)(pMsg); int32_t code = (*fp)(pMsg);
mndReleaseRpcRef(pMnode); mndReleaseRpcRef(pMnode);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mTrace("msg:%p, won't response immediately since in progress", pMsg); mGTrace("msg:%p, won't response immediately since in progress", pMsg);
} else if (code == 0) { } else if (code == 0) {
mTrace("msg:%p, successfully processed", pMsg); mGTrace("msg:%p, successfully processed", pMsg);
} else { } else {
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
return code; return code;
...@@ -620,7 +631,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { ...@@ -620,7 +631,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
// Note: uid 0 is reserved // Note: uid 0 is reserved
int64_t mndGenerateUid(char *name, int32_t len) { int64_t mndGenerateUid(char *name, int32_t len) {
int32_t hashval = MurmurHash3_32(name, len); int32_t hashval = MurmurHash3_32(name, len);
do { do {
int64_t us = taosGetTimestampUs(); int64_t us = taosGetTimestampUs();
int64_t x = (us & 0x000000FFFFFFFFFF) << 24; int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
......
...@@ -122,30 +122,33 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType ...@@ -122,30 +122,33 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
int32_t pid, const char *app, int64_t startTime) { int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
char connStr[255] = {0}; char connStr[255] = {0};
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
uint32_t connId = mndGenerateUid(connStr, len); uint32_t connId = mndGenerateUid(connStr, len);
if (startTime == 0) startTime = taosGetTimestampMs(); if (startTime == 0) startTime = taosGetTimestampMs();
SConnObj connObj = {.id = connId, SConnObj connObj = {
.connType = connType, .id = connId,
.appStartTimeMs = startTime, .connType = connType,
.pid = pid, .appStartTimeMs = startTime,
.ip = ip, .pid = pid,
.port = port, .ip = ip,
.killed = 0, .port = port,
.loginTimeMs = taosGetTimestampMs(), .killed = 0,
.lastAccessTimeMs = 0, .loginTimeMs = taosGetTimestampMs(),
.killId = 0, .lastAccessTimeMs = 0,
.numOfQueries = 0, .killId = 0,
.pQueries = NULL}; .numOfQueries = 0,
.pQueries = NULL,
};
connObj.lastAccessTimeMs = connObj.loginTimeMs; connObj.lastAccessTimeMs = connObj.loginTimeMs;
tstrncpy(connObj.user, user, TSDB_USER_LEN); tstrncpy(connObj.user, user, TSDB_USER_LEN);
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN); tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 3; int32_t keepTime = tsShellActivityTimer * 3;
SConnObj *pConn = taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000); SConnObj *pConn =
taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) { if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
...@@ -174,7 +177,6 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) { ...@@ -174,7 +177,6 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
} }
pConn->lastAccessTimeMs = taosGetTimestampMs(); pConn->lastAccessTimeMs = taosGetTimestampMs();
mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn); mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
return pConn; return pConn;
} }
...@@ -207,13 +209,14 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { ...@@ -207,13 +209,14 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
} }
static int32_t mndProcessConnectReq(SRpcMsg *pReq) { static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t code = -1; int32_t code = -1;
SConnectReq connReq = {0}; SConnectReq connReq = {0};
char ip[30] = {0}; char ip[30] = {0};
const STraceId *trace = &pReq->info.traceId;
if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) { if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -224,11 +227,11 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { ...@@ -224,11 +227,11 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pUser = mndAcquireUser(pMnode, pReq->info.conn.user); pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) { if (pUser == NULL) {
mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr()); mGError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) { if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd); mGError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
code = TSDB_CODE_RPC_AUTH_FAILURE; code = TSDB_CODE_RPC_AUTH_FAILURE;
goto CONN_OVER; goto CONN_OVER;
} }
...@@ -239,8 +242,8 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { ...@@ -239,8 +242,8 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pDb = mndAcquireDb(pMnode, db); pDb = mndAcquireDb(pMnode, db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB; terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
terrstr()); terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
} }
...@@ -248,7 +251,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { ...@@ -248,7 +251,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp, pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime); pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr()); mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
...@@ -273,7 +276,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { ...@@ -273,7 +276,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pReq->info.rspLen = contLen; pReq->info.rspLen = contLen;
pReq->info.rsp = pRsp; pReq->info.rsp = pRsp;
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app); mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
code = 0; code = 0;
...@@ -302,7 +305,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { ...@@ -302,7 +305,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq) { static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SAppObj app; SAppObj app;
...@@ -314,22 +317,19 @@ static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq) ...@@ -314,22 +317,19 @@ static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq)
memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary)); memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
app.lastAccessTimeMs = taosGetTimestampMs(); app.lastAccessTimeMs = taosGetTimestampMs();
int32_t keepTime = tsShellActivityTimer * 3; const int32_t keepTime = tsShellActivityTimer * 3;
SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000); SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
if (pApp == NULL) { if (pApp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr()); mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
return NULL; return NULL;
} }
mTrace("app %" PRIx64 " is put into cache", pReq->appId); mTrace("app %" PRIx64 " is put into cache", pReq->appId);
return pApp; return pApp;
} }
static void mndFreeApp(SAppObj *pApp) { static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
mTrace("app %" PRIx64 " is destroyed", pApp->appId);
}
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) { static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
...@@ -356,7 +356,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) { ...@@ -356,7 +356,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) { void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
SAppObj *pApp = NULL; SAppObj *pApp = NULL;
bool hasNext = taosCacheIterNext(pIter); bool hasNext = taosCacheIterNext(pIter);
if (hasNext) { if (hasNext) {
size_t dataLen = 0; size_t dataLen = 0;
pApp = taosCacheIterGetData(pIter, &dataLen); pApp = taosCacheIterGetData(pIter, &dataLen);
...@@ -439,8 +439,8 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { ...@@ -439,8 +439,8 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
} }
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) { static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
SAppHbReq* pReq = &pHbReq->app; SAppHbReq *pReq = &pHbReq->app;
SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId); SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId);
if (pApp == NULL) { if (pApp == NULL) {
pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq); pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
if (pApp == NULL) { if (pApp == NULL) {
...@@ -448,7 +448,7 @@ static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnIn ...@@ -448,7 +448,7 @@ static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnIn
return -1; return -1;
} else { } else {
mDebug("a new app %" PRIx64 "created", pReq->appId); mDebug("a new app %" PRIx64 "created", pReq->appId);
mndReleaseApp(pMnode, pApp); mndReleaseApp(pMnode, pApp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -464,7 +464,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -464,7 +464,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
SClientHbBatchRsp *pBatchRsp) { SClientHbBatchRsp *pBatchRsp) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
SRpcConnInfo connInfo = pMsg->info.conn; SRpcConnInfo connInfo = pMsg->info.conn;
mndUpdateAppInfo(pMnode, pHbReq, &connInfo); mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
...@@ -637,9 +637,9 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { ...@@ -637,9 +637,9 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
} }
mInfo("kill query msg is received, queryId:%s", killReq.queryStrId); mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
int32_t connId = 0; int32_t connId = 0;
uint64_t queryId = 0; uint64_t queryId = 0;
char* p = strchr(killReq.queryStrId, ':'); char *p = strchr(killReq.queryStrId, ':');
if (NULL == p) { if (NULL == p) {
mError("invalid query id %s", killReq.queryStrId); mError("invalid query id %s", killReq.queryStrId);
terrno = TSDB_CODE_MND_INVALID_QUERY_ID; terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
...@@ -853,12 +853,12 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -853,12 +853,12 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
} }
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;
SAppObj *pApp = NULL; SAppObj *pApp = NULL;
if (pShow->pIter == NULL) { if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->appCache); pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
...@@ -931,7 +931,6 @@ static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo ...@@ -931,7 +931,6 @@ static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
return numOfRows; return numOfRows;
} }
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
if (pIter != NULL) { if (pIter != NULL) {
taosCacheDestroyIter(pIter); taosCacheDestroyIter(pIter);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册