提交 94e662c9 编写于 作者: S Shengliang Guan

minor changes

上级 38af2df4
...@@ -413,7 +413,7 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable) ...@@ -413,7 +413,7 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId)); taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash)); (int32_t)taosHashGetSize(pStable->vgHash));
} }
......
...@@ -201,10 +201,10 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -201,10 +201,10 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return -1; return -1;
} }
for (int32_t i = 0; i < pCfg->replica; ++i) { for (int32_t index = 0; index < pCfg->replica; ++index) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; const SNodeInfo *pNodeInfo = pCfg->nodeInfo + index;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); pNode->peerInfo[index] = syncAddPeer(pNode, pNodeInfo);
if (pNode->peerInfo[i] == NULL) { if (pNode->peerInfo[index] == NULL) {
sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId, sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId,
pNodeInfo->nodeFqdn, pNodeInfo->nodePort); pNodeInfo->nodeFqdn, pNodeInfo->nodePort);
syncStop(pNode->rid); syncStop(pNode->rid);
...@@ -212,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -212,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
} }
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) { if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
pNode->selfIndex = i; pNode->selfIndex = index;
} }
} }
...@@ -251,7 +251,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -251,7 +251,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
} }
syncAddArbitrator(pNode); syncAddArbitrator(pNode);
taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); taosHashPut(tsVgIdHash, &pNode->vgId, sizeof(int32_t), &pNode, sizeof(SSyncNode *));
if (pNode->notifyRole) { if (pNode->notifyRole) {
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
...@@ -268,14 +268,14 @@ void syncStop(int64_t rid) { ...@@ -268,14 +268,14 @@ void syncStop(int64_t rid) {
sInfo("vgId:%d, cleanup sync", pNode->vgId); sInfo("vgId:%d, cleanup sync", pNode->vgId);
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
if (pNode->pRoleTimer) taosTmrStop(pNode->pRoleTimer); if (pNode->pRoleTimer) taosTmrStop(pNode->pRoleTimer);
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t index = 0; index < pNode->replica; ++index) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[index];
if (pPeer) syncRemovePeer(pPeer); if (pPeer) syncRemovePeer(pPeer);
} }
...@@ -297,7 +297,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -297,7 +297,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica); pNode->replica);
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
for (j = 0; j < pNewCfg->replica; ++j) { for (j = 0; j < pNewCfg->replica; ++j) {
...@@ -414,7 +414,7 @@ void syncRecover(int64_t rid) { ...@@ -414,7 +414,7 @@ void syncRecover(int64_t rid) {
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
nodeVersion = 0; nodeVersion = 0;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
...@@ -831,7 +831,7 @@ static void syncNotStarted(void *param, void *tmrId) { ...@@ -831,7 +831,7 @@ static void syncNotStarted(void *param, void *tmrId) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
pPeer->timer = NULL; pPeer->timer = NULL;
sInfo("%s, sync connection is still not up, restart", pPeer->id); sInfo("%s, sync connection is still not up, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -842,7 +842,7 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { ...@@ -842,7 +842,7 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
syncRecoverFromMaster(pPeer); syncRecoverFromMaster(pPeer);
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
...@@ -968,7 +968,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { ...@@ -968,7 +968,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
char * cont = buffer; char * cont = buffer;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
int32_t code = syncReadPeerMsg(pPeer, &head, cont); int32_t code = syncReadPeerMsg(pPeer, &head, cont);
...@@ -1066,7 +1066,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { ...@@ -1066,7 +1066,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
sDebug("%s, check peer connection", pPeer->id); sDebug("%s, check peer connection", pPeer->id);
syncSetupPeerConnection(pPeer); syncSetupPeerConnection(pPeer);
...@@ -1118,7 +1118,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1118,7 +1118,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
} }
SSyncNode *pNode = *ppNode; SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
...@@ -1156,7 +1156,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1156,7 +1156,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno));
pPeer->peerFd = -1; pPeer->peerFd = -1;
...@@ -1262,7 +1262,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1262,7 +1262,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
int64_t time = taosGetTimestampMs(); int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (ABS(time - pFwdInfo->time) < 2000) break; if (ABS(time - pFwdInfo->time) < 2000) break;
...@@ -1320,7 +1320,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1320,7 +1320,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
pSyncHead->len = sizeof(SWalHead) + pWalHead->len; pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
......
...@@ -338,7 +338,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -338,7 +338,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId); tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = pVnode->vgId; syncInfo.vgId = pVnode->vgId;
...@@ -570,7 +570,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { ...@@ -570,7 +570,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed // remove from hash, so new messages wont be consumed
taosHashRemove(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t));
if (pVnode->status != TAOS_VN_STATUS_INIT) { if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait // it may be in updateing or reset state, then it shall wait
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册