提交 521ae131 编写于 作者: S Shengliang Guan

TD-2266

上级 a9f53e2f
...@@ -444,12 +444,12 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { ...@@ -444,12 +444,12 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont; SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) { if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); dDebug("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
} }
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); dDebug("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
} }
......
...@@ -251,12 +251,30 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { ...@@ -251,12 +251,30 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForPeer; *epSet = tsMnodeEpSetForPeer;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mTrace("mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
} }
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForShell; *epSet = tsMnodeEpSetForShell;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mTrace("mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
} }
char* mnodeGetMnodeMasterEp() { char* mnodeGetMnodeMasterEp() {
......
...@@ -58,16 +58,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -58,16 +58,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg, mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -51,21 +51,12 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -51,21 +51,12 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet); mnodeGetMnodeEpSetForShell(epSet);
mDebug("msg:%p, app:%p type:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -506,7 +506,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -506,7 +506,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) {
atomic_add_fetch_32(&pTable->autoIndex, 1); atomic_add_fetch_32(&pTable->autoIndex, 1);
} }
sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg);
int32_t code = (*pTable->fpInsert)(pRow); int32_t code = (*pTable->fpInsert)(pRow);
...@@ -542,7 +542,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -542,7 +542,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
atomic_sub_fetch_32(&pTable->numOfRows, 1); atomic_sub_fetch_32(&pTable->numOfRows, 1);
sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
sdbDecRef(pTable, pRow->pObj); sdbDecRef(pTable, pRow->pObj);
...@@ -551,7 +551,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -551,7 +551,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
} }
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) { static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) {
sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
(*pTable->fpUpdate)(pRow); (*pTable->fpUpdate)(pRow);
...@@ -649,7 +649,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * ...@@ -649,7 +649,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
return syncCode; return syncCode;
} }
sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, sdbTrace("vgId:1, sdb:%s, record from %s is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, qtypeStr[qtype],
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync // even it is WAL/FWD, it shall be called to update version in sync
...@@ -855,6 +855,7 @@ void sdbCloseTable(void *handle) { ...@@ -855,6 +855,7 @@ void sdbCloseTable(void *handle) {
taosHashCancelIterate(pTable->iHandle, pIter); taosHashCancelIterate(pTable->iHandle, pIter);
taosHashCleanup(pTable->iHandle); taosHashCleanup(pTable->iHandle);
pTable->iHandle = NULL;
pthread_mutex_destroy(&pTable->mutex); pthread_mutex_destroy(&pTable->mutex);
sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->name, tsSdbMgmt.numOfTables); sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->name, tsSdbMgmt.numOfTables);
......
...@@ -54,18 +54,8 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -54,18 +54,8 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d, set inUse to %d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册