提交 c9bcfe5f 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

first draft for sync flow control

上级 d3d80f59
......@@ -78,6 +78,9 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
// when role is changed, call this to notify app
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
// if a number of retrieving data failed, call this to start flow control
typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds);
// when data file is synced successfully, notity app
typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
......@@ -93,6 +96,7 @@ typedef struct {
FWriteToCache writeToCache;
FConfirmForward confirmForward;
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
} SSyncInfo;
......
......@@ -125,6 +125,8 @@ typedef struct SsyncPeer {
uint64_t sversion; // track the peer version in retrieve process
int syncFd;
int peerFd; // forward FD
int numOfRetrieves; // number of retrieves tried
int fileChanged; // a flag to indicate file is changed during retrieving process
void *timer;
void *pConn;
int notifyFd;
......@@ -152,6 +154,7 @@ typedef struct SSyncNode {
FWriteToCache writeToCache;
FConfirmForward confirmForward;
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
pthread_mutex_t mutex;
} SSyncNode;
......
......@@ -137,6 +137,7 @@ void *syncStart(const SSyncInfo *pInfo)
pNode->writeToCache = pInfo->writeToCache;
pNode->notifyRole = pInfo->notifyRole;
pNode->confirmForward = pInfo->confirmForward;
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
pNode->notifyFileSynced = pInfo->notifyFileSynced;
pNode->selfIndex = -1;
......@@ -530,6 +531,15 @@ void syncBroadcastStatus(SSyncNode *pNode)
}
}
static void syncResetFlowCtrl(SSyncNode *pNode) {
for (int i = 0; i < pNode->replica; ++i) {
pNode->peerInfo[i]->numOfRetrieves = 0;
}
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
}
static void syncChooseMaster(SSyncNode *pNode) {
SSyncPeer *pPeer;
int onlineNum = 0;
......@@ -575,6 +585,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
if (index == pNode->selfIndex) {
sInfo("vgId:%d, start to work as master", pNode->vgId);
nodeRole = TAOS_SYNC_ROLE_MASTER;
syncResetFlowCtrl(pNode);
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
} else {
pPeer = pNode->peerInfo[index];
......@@ -706,6 +717,9 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if (peerOldRole != newRole || nodeRole != selfOldRole)
syncBroadcastStatus(pNode);
if (nodeRole != TAOS_SYNC_ROLE_MASTER)
syncResetFlowCtrl(pNode);
}
static void syncRestartPeer(SSyncPeer *pPeer) {
......
......@@ -83,6 +83,7 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
int code = 0;
if (len >0) {
sDebug("%s, processed file is changed", pPeer->id);
pPeer->fileChanged = 1;
code = 1;
}
......@@ -454,9 +455,11 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
void *syncRetrieveData(void *param)
{
SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
pPeer->fileChanged = 0;
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (pPeer->syncFd < 0) {
sError("%s, failed to open socket to sync", pPeer->id);
......@@ -471,6 +474,17 @@ void *syncRetrieveData(void *param)
}
}
if (pPeer->fileChanged) {
// if file is changed 3 times continuously, start flow control
pPeer->numOfRetrieves++;
if (pPeer->numOfRetrieves >= 3)
(*pNode->notifyFlowCtrl)(pNode->ahandle, pPeer->numOfRetrieves - 2);
} else {
pPeer->numOfRetrieves = 0;
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
}
pPeer->fileChanged = 0;
tclose(pPeer->notifyFd);
tclose(pPeer->syncFd);
syncDecPeerRef(pPeer);
......
......@@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int32_t delay;
int8_t status;
int8_t role;
int8_t accessState;
......
......@@ -44,6 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
int32_t vnodeInitResources() {
......@@ -277,6 +278,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
pVnode->sync = syncStart(&syncInfo);
......@@ -549,6 +551,13 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop(pVnode->cq);
}
static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) {
SVnodeObj *pVnode = ahandle;
if (pVnode->delay != mseconds)
vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
pVnode->delay = mseconds;
}
static int vnodeResetTsdb(SVnodeObj *pVnode)
{
char rootDir[128] = "\0";
......
......@@ -78,6 +78,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// assign version
pVnode->version++;
pHead->version = pVnode->version;
if (pVnode->delay) usleep(pVnode->delay*1000);
} else { // from wal or forward
// for data from WAL or forward, version may be smaller
if (pHead->version <= pVnode->version) return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册