提交 a82d594a 编写于 作者: S Shengliang Guan

TD-2157

上级 2171bf7a
...@@ -24,18 +24,18 @@ extern "C" { ...@@ -24,18 +24,18 @@ extern "C" {
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF #define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef enum _TAOS_SYNC_ROLE { typedef enum _TAOS_SYNC_ROLE {
TAOS_SYNC_ROLE_OFFLINE, TAOS_SYNC_ROLE_OFFLINE = 0,
TAOS_SYNC_ROLE_UNSYNCED, TAOS_SYNC_ROLE_UNSYNCED = 1,
TAOS_SYNC_ROLE_SYNCING, TAOS_SYNC_ROLE_SYNCING = 2,
TAOS_SYNC_ROLE_SLAVE, TAOS_SYNC_ROLE_SLAVE = 3,
TAOS_SYNC_ROLE_MASTER, TAOS_SYNC_ROLE_MASTER = 4
} ESyncRole; } ESyncRole;
typedef enum _TAOS_SYNC_STATUS { typedef enum _TAOS_SYNC_STATUS {
TAOS_SYNC_STATUS_INIT, TAOS_SYNC_STATUS_INIT = 0,
TAOS_SYNC_STATUS_START, TAOS_SYNC_STATUS_START = 1,
TAOS_SYNC_STATUS_FILE, TAOS_SYNC_STATUS_FILE = 2,
TAOS_SYNC_STATUS_CACHE, TAOS_SYNC_STATUS_CACHE = 3
} ESyncStatus; } ESyncStatus;
typedef struct { typedef struct {
......
...@@ -27,16 +27,20 @@ extern "C" { ...@@ -27,16 +27,20 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define TAOS_SMSG_SYNC_DATA 1 typedef enum {
#define TAOS_SMSG_FORWARD 2 TAOS_SMSG_SYNC_DATA = 1,
#define TAOS_SMSG_FORWARD_RSP 3 TAOS_SMSG_FORWARD = 2,
#define TAOS_SMSG_SYNC_REQ 4 TAOS_SMSG_FORWARD_RSP = 3,
#define TAOS_SMSG_SYNC_RSP 5 TAOS_SMSG_SYNC_REQ = 4,
#define TAOS_SMSG_SYNC_MUST 6 TAOS_SMSG_SYNC_RSP = 5,
#define TAOS_SMSG_STATUS 7 TAOS_SMSG_SYNC_MUST = 6,
TAOS_SMSG_STATUS = 7
} ESyncMsgType;
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
...@@ -123,12 +127,12 @@ typedef struct SsyncPeer { ...@@ -123,12 +127,12 @@ typedef struct SsyncPeer {
int32_t nodeId; int32_t nodeId;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int8_t role;
int8_t sstatus; // sync status
char fqdn[TSDB_FQDN_LEN]; // peer ip string char fqdn[TSDB_FQDN_LEN]; // peer ip string
char id[TSDB_EP_LEN + 32]; // peer vgId + end point char id[TSDB_EP_LEN + 32]; // peer vgId + end point
int8_t role;
int8_t sstatus; // sync status
uint64_t version; uint64_t version;
uint64_t sversion; // track the peer version in retrieve process uint64_t sversion; // track the peer version in retrieve process
int32_t syncFd; int32_t syncFd;
int32_t peerFd; // forward FD int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried int32_t numOfRetrieves; // number of retrieves tried
...@@ -138,7 +142,7 @@ typedef struct SsyncPeer { ...@@ -138,7 +142,7 @@ typedef struct SsyncPeer {
int32_t notifyFd; int32_t notifyFd;
int32_t watchNum; int32_t watchNum;
int32_t *watchFd; int32_t *watchFd;
int8_t refCount; // reference count int32_t refCount; // reference count
struct SSyncNode *pSyncNode; struct SSyncNode *pSyncNode;
} SSyncPeer; } SSyncPeer;
...@@ -146,16 +150,16 @@ typedef struct SSyncNode { ...@@ -146,16 +150,16 @@ typedef struct SSyncNode {
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
int8_t replica; int8_t replica;
int8_t quorum; int8_t quorum;
int8_t selfIndex;
uint32_t vgId; uint32_t vgId;
int64_t rid; int64_t rid;
void *ahandle; void *ahandle;
int8_t selfIndex;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
SSyncPeer *pMaster; SSyncPeer *pMaster;
int8_t refCount;
SRecvBuffer *pRecv; SRecvBuffer *pRecv;
SSyncFwds *pSyncFwds; // saved forward info if quorum >1 SSyncFwds *pSyncFwds; // saved forward info if quorum >1
void *pFwdTimer; void *pFwdTimer;
void *pRoleTimer;
FGetFileInfo getFileInfo; FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo; FGetWalInfo getWalInfo;
FWriteToCache writeToCache; FWriteToCache writeToCache;
......
...@@ -59,6 +59,7 @@ static void syncAddArbitrator(SSyncNode *pNode); ...@@ -59,6 +59,7 @@ static void syncAddArbitrator(SSyncNode *pNode);
static void syncFreeNode(void *); static void syncFreeNode(void *);
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncMonitorFwdInfos(void *param, void *tmrId);
static void syncMonitorNodeRole(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer); static void syncRestartPeer(SSyncPeer *pPeer);
...@@ -79,7 +80,9 @@ typedef enum { ...@@ -79,7 +80,9 @@ typedef enum {
SYNC_STATUS_SETUP_CONN, SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP, SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA, SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType; } ESyncStatusType;
char *statusType[] = { char *statusType[] = {
...@@ -88,7 +91,9 @@ char *statusType[] = { ...@@ -88,7 +91,9 @@ char *statusType[] = {
"setup-conn", "setup-conn",
"setup-conn-rsp", "setup-conn-rsp",
"exchange-data", "exchange-data",
"exchange-data-rsp" "exchange-data-rsp",
"check-role",
"check-role-rsp"
}; };
uint16_t syncGenTranId() { uint16_t syncGenTranId() {
...@@ -233,9 +238,16 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -233,9 +238,16 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return -1; return -1;
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
if (pNode->pFwdTimer == NULL) { if (pNode->pFwdTimer == NULL) {
sError("vgId:%d, failed to allocate timer", pNode->vgId); sError("vgId:%d, failed to allocate fwd timer", pNode->vgId);
syncStop(pNode->rid);
return -1;
}
pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
if (pNode->pRoleTimer == NULL) {
sError("vgId:%d, failed to allocate role timer", pNode->vgId);
syncStop(pNode->rid); syncStop(pNode->rid);
return -1; return -1;
} }
...@@ -262,6 +274,7 @@ void syncStop(int64_t rid) { ...@@ -262,6 +274,7 @@ void syncStop(int64_t rid) {
if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
if (pNode->pRoleTimer) taosTmrStop(pNode->pRoleTimer);
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
...@@ -471,10 +484,10 @@ static void syncFreeNode(void *param) { ...@@ -471,10 +484,10 @@ static void syncFreeNode(void *param) {
tfree(pNode); tfree(pNode);
} }
void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_32(&pPeer->refCount, 1); }
int32_t syncDecPeerRef(SSyncPeer *pPeer) { int32_t syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_32(&pPeer->refCount, 1) == 0) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
sDebug("%s, resource is freed", pPeer->id); sDebug("%s, resource is freed", pPeer->id);
...@@ -699,20 +712,20 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new ...@@ -699,20 +712,20 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
int8_t syncRequired = 0; int8_t syncRequired = 0;
pPeer->role = newPeerRole; pPeer->role = newPeerRole;
sTrace("%s, peer role:%s change to %s", pPeer->id, syncRole[oldPeerRole], syncRole[newPeerRole]); sDebug("%s, peer role:%s change to %s", pPeer->id, syncRole[oldPeerRole], syncRole[newPeerRole]);
SSyncPeer *pMaster = syncCheckMaster(pNode); SSyncPeer *pMaster = syncCheckMaster(pNode);
if (pMaster) { if (pMaster) {
// master is there // master is there
pNode->pMaster = pMaster; pNode->pMaster = pMaster;
sTrace("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version); sDebug("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version);
if (syncValidateMaster(pPeer) < 0) return; if (syncValidateMaster(pPeer) < 0) return;
if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) {
if (nodeVersion < pMaster->version) { if (nodeVersion < pMaster->version) {
sTrace("%s, is master, sync required, self sver:%" PRIu64, pMaster->id, nodeVersion); sDebug("%s, is master, sync required, self sver:%" PRIu64, pMaster->id, nodeVersion);
syncRequired = 1; syncRequired = 1;
} else { } else {
sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
...@@ -720,7 +733,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new ...@@ -720,7 +733,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
} else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
sTrace("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
} }
} else { } else {
// master not there, if all peer's state and version are consistent, choose the master // master not there, if all peer's state and version are consistent, choose the master
...@@ -739,10 +752,10 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new ...@@ -739,10 +752,10 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
} }
if (consistent) { if (consistent) {
sTrace("vgId:%d, choose master", pNode->vgId); sDebug("vgId:%d, choose master", pNode->vgId);
syncChooseMaster(pNode); syncChooseMaster(pNode);
} else { } else {
sTrace("vgId:%d, version inconsistent, cannot choose master", pNode->vgId); sDebug("vgId:%d, version inconsistent, cannot choose master", pNode->vgId);
} }
} }
...@@ -1221,8 +1234,26 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1221,8 +1234,26 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
} }
static void syncMonitorNodeRole(void *param, void *tmrId) {
int64_t rid = (int64_t)param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
for (int32_t index = 0; index < pNode->replica; index++) {
if (index == pNode->selfIndex) continue;
SSyncPeer *pPeer = pNode->peerInfo[index];
if (pPeer->role <= TAOS_SYNC_ROLE_UNSYNCED || nodeRole <= TAOS_SYNC_ROLE_UNSYNCED) {
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId());
}
}
pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
taosReleaseRef(tsSyncRefId, rid);
}
static void syncMonitorFwdInfos(void *param, void *tmrId) { static void syncMonitorFwdInfos(void *param, void *tmrId) {
int64_t rid = (int64_t) param; int64_t rid = (int64_t)param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return; if (pNode == NULL) return;
...@@ -1246,7 +1277,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1246,7 +1277,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
} }
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册