提交 58bbdc32 编写于 作者: S Shengliang Guan

TD-2166 add refcount while perform sync

上级 a1d247af
...@@ -64,33 +64,32 @@ typedef struct { ...@@ -64,33 +64,32 @@ typedef struct {
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
*/ */
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); typedef uint32_t (*FGetFileInfo)(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
// get the wal file from index or after // get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId); typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data // when a forward pkt is received, call this to handle data
typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef int32_t (*FWriteToCache)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app // when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
// when role is changed, call this to notify app // when role is changed, call this to notify app
typedef void (*FNotifyRole)(void *ahandle, int8_t role); typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
// if a number of retrieving data failed, call this to start flow control // if a number of retrieving data failed, call this to start flow control
typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds); typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds);
// when data file is synced successfully, notity app // when data file is synced successfully, notity app
typedef int32_t (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
typedef struct { typedef struct {
int32_t vgId; // vgroup ID int32_t vgId; // vgroup ID
uint64_t version; // initial version uint64_t version; // initial version
SSyncCfg syncCfg; // configuration from mgmt SSyncCfg syncCfg; // configuration from mgmt
char path[128]; // path to the file char path[128]; // path to the file
void * ahandle; // handle provided by APP
FGetFileInfo getFileInfo; FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo; FGetWalInfo getWalInfo;
FWriteToCache writeToCache; FWriteToCache writeToCache;
......
...@@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue; ...@@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue;
static SSdbWorkerPool tsSdbPool; static SSdbWorkerPool tsSdbPool;
static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused); static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused);
static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam); static int32_t sdbWriteWalToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam);
static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action); static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action);
static void sdbFreeFromQueue(SSdbRow *pRow); static void sdbFreeFromQueue(SSdbRow *pRow);
static void * sdbWorkerFp(void *pWorker); static void * sdbWorkerFp(void *pWorker);
...@@ -228,16 +228,16 @@ void sdbUpdateMnodeRoles() { ...@@ -228,16 +228,16 @@ void sdbUpdateMnodeRoles() {
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet();
} }
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
return 0; return 0;
} }
static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { static int32_t sdbGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
return walGetWalFile(tsSdbMgmt.wal, fileName, fileId); return walGetWalFile(tsSdbMgmt.wal, fileName, fileId);
} }
static void sdbNotifyRole(void *ahandle, int8_t role) { static void sdbNotifyRole(int32_t vgId, int8_t role) {
sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]); sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]);
if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) { if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) {
...@@ -264,7 +264,7 @@ static void sdbHandleFailedConfirm(SSdbRow *pRow) { ...@@ -264,7 +264,7 @@ static void sdbHandleFailedConfirm(SSdbRow *pRow) {
} }
FORCE_INLINE FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { static void sdbConfirmForward(int32_t vgId, void *wparam, int32_t code) {
if (wparam == NULL) return; if (wparam == NULL) return;
SSdbRow *pRow = wparam; SSdbRow *pRow = wparam;
SMnodeMsg * pMsg = pRow->pMsg; SMnodeMsg * pMsg = pRow->pMsg;
...@@ -370,7 +370,6 @@ void sdbUpdateSync(void *pMnodes) { ...@@ -370,7 +370,6 @@ void sdbUpdateSync(void *pMnodes) {
syncInfo.version = sdbGetVersion(); syncInfo.version = sdbGetVersion();
syncInfo.syncCfg = syncCfg; syncInfo.syncCfg = syncCfg;
sprintf(syncInfo.path, "%s", tsMnodeDir); sprintf(syncInfo.path, "%s", tsMnodeDir);
syncInfo.ahandle = NULL;
syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.writeToCache = sdbWriteWalToQueue; syncInfo.writeToCache = sdbWriteWalToQueue;
...@@ -967,7 +966,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) { ...@@ -967,7 +966,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) {
taosFreeQitem(pRow); taosFreeQitem(pRow);
} }
static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { static int32_t sdbWriteWalToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
SWalHead *pHead = wparam; SWalHead *pHead = wparam;
int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len;
...@@ -1039,7 +1038,7 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1039,7 +1038,7 @@ static void *sdbWorkerFp(void *pWorker) {
taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow); taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow);
if (qtype == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
sdbConfirmForward(NULL, pRow, pRow->code); sdbConfirmForward(1, pRow, pRow->code);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) { if (qtype == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code); syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code);
......
...@@ -153,7 +153,6 @@ typedef struct SSyncNode { ...@@ -153,7 +153,6 @@ typedef struct SSyncNode {
int8_t selfIndex; int8_t selfIndex;
uint32_t vgId; uint32_t vgId;
int64_t rid; int64_t rid;
void *ahandle;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
SSyncPeer *pMaster; SSyncPeer *pMaster;
SRecvBuffer *pRecv; SRecvBuffer *pRecv;
......
...@@ -181,7 +181,6 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -181,7 +181,6 @@ int64_t syncStart(const SSyncInfo *pInfo) {
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
pthread_mutex_init(&pNode->mutex, NULL); pthread_mutex_init(&pNode->mutex, NULL);
pNode->ahandle = pInfo->ahandle;
pNode->getFileInfo = pInfo->getFileInfo; pNode->getFileInfo = pInfo->getFileInfo;
pNode->getWalInfo = pInfo->getWalInfo; pNode->getWalInfo = pInfo->getWalInfo;
pNode->writeToCache = pInfo->writeToCache; pNode->writeToCache = pInfo->writeToCache;
...@@ -255,7 +254,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -255,7 +254,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
if (pNode->notifyRole) { if (pNode->notifyRole) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} }
return pNode->rid; return pNode->rid;
...@@ -348,7 +347,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -348,7 +347,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
if (pNewCfg->replica <= 1) { if (pNewCfg->replica <= 1) {
sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId); sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId);
nodeRole = TAOS_SYNC_ROLE_MASTER; nodeRole = TAOS_SYNC_ROLE_MASTER;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
...@@ -412,7 +411,7 @@ void syncRecover(int64_t rid) { ...@@ -412,7 +411,7 @@ void syncRecover(int64_t rid) {
// if take this node to unsync state, the whole system may not work // if take this node to unsync state, the whole system may not work
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
nodeVersion = 0; nodeVersion = 0;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
...@@ -568,7 +567,7 @@ static void syncResetFlowCtrl(SSyncNode *pNode) { ...@@ -568,7 +567,7 @@ static void syncResetFlowCtrl(SSyncNode *pNode) {
} }
if (pNode->notifyFlowCtrl) { if (pNode->notifyFlowCtrl) {
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0); (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
} }
} }
...@@ -631,7 +630,7 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -631,7 +630,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
} }
#endif #endif
syncResetFlowCtrl(pNode); syncResetFlowCtrl(pNode);
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} else { } else {
pPeer = pNode->peerInfo[index]; pPeer = pNode->peerInfo[index];
sInfo("%s, it shall work as master", pPeer->id); sInfo("%s, it shall work as master", pPeer->id);
...@@ -662,7 +661,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -662,7 +661,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if (onlineNum <= replica * 0.5) { if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
} }
} else { } else {
...@@ -675,7 +674,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -675,7 +674,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if (masterIndex == pNode->selfIndex) { if (masterIndex == pNode->selfIndex) {
sError("%s, peer is master, work as slave instead", pTemp->id); sError("%s, peer is master, work as slave instead", pTemp->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} }
} }
} }
...@@ -692,7 +691,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) { ...@@ -692,7 +691,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version); sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
code = -1; code = -1;
for (int32_t index = 0; index < pNode->replica; ++index) { for (int32_t index = 0; index < pNode->replica; ++index) {
...@@ -729,7 +728,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new ...@@ -729,7 +728,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
} else { } else {
sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} }
} else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
...@@ -913,7 +912,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -913,7 +912,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
} else { } else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead); syncSaveIntoBuffer(pPeer, pHead);
...@@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
if (confirm && pFwdInfo->confirmed == 0) { if (confirm && pFwdInfo->confirmed == 0) {
sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
(*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code); (*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code);
pFwdInfo->confirmed = 1; pFwdInfo->confirmed = 1;
} }
} }
......
...@@ -38,7 +38,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex ...@@ -38,7 +38,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
while (1) { while (1) {
name[0] = 0; name[0] = 0;
magic = (*pNode->getFileInfo)(pNode->ahandle, name, &index, eindex, &size, &fversion); magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion);
if (magic == 0) break; if (magic == 0) break;
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name); snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
...@@ -84,7 +84,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -84,7 +84,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// check the file info // check the file info
sinfo = minfo; sinfo = minfo;
sDebug("%s, get file info:%s", pPeer->id, minfo.name); sDebug("%s, get file info:%s", pPeer->id, minfo.name);
sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
&sinfo.fversion); &sinfo.fversion);
// if file not there or magic is not the same, file shall be synced // if file not there or magic is not the same, file shall be synced
...@@ -164,7 +164,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -164,7 +164,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
} }
lastVer = pHead->version; lastVer = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL); (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL);
} }
if (code < 0) { if (code < 0) {
...@@ -179,7 +179,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { ...@@ -179,7 +179,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)offset; SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
offset += pHead->len + sizeof(SWalHead); offset += pHead->len + sizeof(SWalHead);
return offset; return offset;
...@@ -276,7 +276,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -276,7 +276,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
// if code > 0, data file is changed, notify app, and pass the version // if code > 0, data file is changed, notify app, and pass the version
if (code > 0 && pNode->notifyFileSynced) { if (code > 0 && pNode->notifyFileSynced) {
if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) { if ((*pNode->notifyFileSynced)(pNode->vgId, fversion) < 0) {
sError("%s, app not in ready state", pPeer->id); sError("%s, app not in ready state", pPeer->id);
return -1; return -1;
} }
...@@ -307,7 +307,7 @@ void *syncRestoreData(void *param) { ...@@ -307,7 +307,7 @@ void *syncRestoreData(void *param) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1); __sync_fetch_and_add(&tsSyncNum, 1);
(*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING); (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
if (syncOpenRecvBuffer(pNode) < 0) { if (syncOpenRecvBuffer(pNode) < 0) {
sError("%s, failed to allocate recv buffer, restart connection", pPeer->id); sError("%s, failed to allocate recv buffer, restart connection", pPeer->id);
...@@ -324,7 +324,7 @@ void *syncRestoreData(void *param) { ...@@ -324,7 +324,7 @@ void *syncRestoreData(void *param) {
} }
} }
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
nodeSStatus = TAOS_SYNC_STATUS_INIT; nodeSStatus = TAOS_SYNC_STATUS_INIT;
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
......
...@@ -108,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -108,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
while (1) { while (1) {
// retrieve file info // retrieve file info
fileInfo.name[0] = 0; fileInfo.name[0] = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion); &fileInfo.size, &fileInfo.fversion);
// fileInfo.size = htonl(size); // fileInfo.size = htonl(size);
...@@ -354,7 +354,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -354,7 +354,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
index++; index++;
wname[0] = 0; wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break; if (code < 0) break;
if (wname[0] == 0) { if (wname[0] == 0) {
code = 0; code = 0;
...@@ -382,7 +382,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -382,7 +382,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
while (1) { while (1) {
// retrieve wal info // retrieve wal info
wname[0] = 0; wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break; // error if (code < 0) break; // error
if (wname[0] == 0) { // no wal file if (wname[0] == 0) { // no wal file
sDebug("%s, no wal file", pPeer->id); sDebug("%s, no wal file", pPeer->id);
...@@ -487,10 +487,10 @@ void *syncRetrieveData(void *param) { ...@@ -487,10 +487,10 @@ void *syncRetrieveData(void *param) {
// if file is changed 3 times continuously, start flow control // if file is changed 3 times continuously, start flow control
pPeer->numOfRetrieves++; pPeer->numOfRetrieves++;
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
(*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2)); (*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2));
} else { } else {
pPeer->numOfRetrieves = 0; pPeer->numOfRetrieves = 0;
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
} }
pPeer->fileChanged = 0; pPeer->fileChanged = 0;
......
...@@ -70,7 +70,7 @@ int writeIntoWal(SWalHead *pHead) { ...@@ -70,7 +70,7 @@ int writeIntoWal(SWalHead *pHead) {
return 0; return 0;
} }
void confirmForward(void *ahandle, void *mhandle, int32_t code) { void confirmForward(int32_t vgId, void *mhandle, int32_t code) {
SRpcMsg * pMsg = (SRpcMsg *)mhandle; SRpcMsg * pMsg = (SRpcMsg *)mhandle;
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
...@@ -227,7 +227,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -227,7 +227,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { uint32_t getFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
uint32_t magic; uint32_t magic;
struct stat fstat; struct stat fstat;
char aname[280]; char aname[280];
...@@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex ...@@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
return magic; return magic;
} }
int getWalInfo(void *ahandle, char *name, int64_t *index) { int getWalInfo(int32_t vgId, char *name, int64_t *index) {
struct stat fstat; struct stat fstat;
char aname[280]; char aname[280];
...@@ -272,7 +272,7 @@ int getWalInfo(void *ahandle, char *name, int64_t *index) { ...@@ -272,7 +272,7 @@ int getWalInfo(void *ahandle, char *name, int64_t *index) {
return 1; return 1;
} }
int writeToCache(void *ahandle, void *data, int type) { int writeToCache(int32_t vgId, void *data, int type) {
SWalHead *pHead = data; SWalHead *pHead = data;
uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
...@@ -285,9 +285,9 @@ int writeToCache(void *ahandle, void *data, int type) { ...@@ -285,9 +285,9 @@ int writeToCache(void *ahandle, void *data, int type) {
return 0; return 0;
} }
void confirmFwd(void *ahandle, int64_t version) { return; } void confirmFwd(int32_t vgId, int64_t version) { return; }
void notifyRole(void *ahandle, int8_t r) { void notifyRole(int32_t vgId, int8_t r) {
role = r; role = r;
printf("current role:%s\n", syncRole[role]); printf("current role:%s\n", syncRole[role]);
} }
...@@ -296,7 +296,6 @@ void initSync() { ...@@ -296,7 +296,6 @@ void initSync() {
pCfg->replica = 1; pCfg->replica = 1;
pCfg->quorum = 1; pCfg->quorum = 1;
syncInfo.vgId = 1; syncInfo.vgId = 1;
syncInfo.ahandle = &syncInfo;
syncInfo.getFileInfo = getFileInfo; syncInfo.getFileInfo = getFileInfo;
syncInfo.getWalInfo = getWalInfo; syncInfo.getWalInfo = getWalInfo;
syncInfo.writeToCache = writeToCache; syncInfo.writeToCache = writeToCache;
......
...@@ -30,19 +30,21 @@ ...@@ -30,19 +30,21 @@
static SHashObj*tsVnodesHash; static SHashObj*tsVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeCleanUp(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status, int eno); static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId); static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(int32_t vgId, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds); static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
#ifndef _SYNC #ifndef _SYNC
int64_t syncStart(const SSyncInfo *info) { return NULL; } int64_t syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; } int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype) { return 0; }
void syncStop(int64_t rid) {} void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; } int32_t syncReconfig(int64_t rid, const SSyncCfg *cfg) { return 0; }
int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; } int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; }
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {} void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif #endif
...@@ -55,7 +57,7 @@ char* vnodeStatus[] = { ...@@ -55,7 +57,7 @@ char* vnodeStatus[] = {
}; };
int32_t vnodeInitResources() { int32_t vnodeInitResources() {
int code = syncInit(); int32_t code = syncInit();
if (code != 0) return code; if (code != 0) return code;
vnodeInitWriteFp(); vnodeInitWriteFp();
...@@ -325,16 +327,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -325,16 +327,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
walRemoveAllOldFiles(pVnode->wal); walRemoveAllOldFiles(pVnode->wal);
walRenew(pVnode->wal); walRenew(pVnode->wal);
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
if (pVnode->qMgmt == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = pVnode->vgId; syncInfo.vgId = pVnode->vgId;
syncInfo.version = pVnode->version; syncInfo.version = pVnode->version;
syncInfo.syncCfg = pVnode->syncCfg; syncInfo.syncCfg = pVnode->syncCfg;
sprintf(syncInfo.path, "%s", rootDir); sprintf(syncInfo.path, "%s", rootDir);
syncInfo.ahandle = pVnode;
syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToWQueue; syncInfo.writeToCache = vnodeWriteToCache;
syncInfo.confirmForward = dnodeSendRpcVWriteRsp; syncInfo.confirmForward = vnodeConfirmForard;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced; syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
...@@ -346,24 +361,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -346,24 +361,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if (pVnode->sync <= 0) { if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno)); tstrerror(terrno));
vnodeRelease(pVnode);
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
} }
#endif #endif
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
if (pVnode->qMgmt == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -389,7 +392,7 @@ void vnodeRelease(void *vparam) { ...@@ -389,7 +392,7 @@ void vnodeRelease(void *vparam) {
assert(refCount >= 0); assert(refCount >= 0);
if (refCount > 0) { if (refCount > 0) {
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) { if (pVnode->status == TAOS_VN_STATUS_RESET && refCount <= 3) {
tsem_post(&pVnode->sem); tsem_post(&pVnode->sem);
} }
return; return;
...@@ -571,7 +574,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -571,7 +574,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
if (pVnode->status != TAOS_VN_STATUS_INIT) { if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait // it may be in updateing or reset state, then it shall wait
int i = 0; int32_t i = 0;
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) !=
TAOS_VN_STATUS_READY) { TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
...@@ -595,7 +598,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -595,7 +598,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
} }
// TODO: this is a simple implement // TODO: this is a simple implement
static int vnodeProcessTsdbStatus(void *arg, int status, int eno) { static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
SVnodeObj *pVnode = arg; SVnodeObj *pVnode = arg;
if (eno != TSDB_CODE_SUCCESS) { if (eno != TSDB_CODE_SUCCESS) {
...@@ -625,20 +628,41 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) { ...@@ -625,20 +628,41 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
return 0; return 0;
} }
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size,
uint64_t *fversion) { uint64_t *fversion) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get file info", vgId);
return 0;
}
*fversion = pVnode->fversion; *fversion = pVnode->fversion;
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
vnodeRelease(pVnode);
return ret;
} }
static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = vnodeAcquire(vgId);
return walGetWalFile(pVnode->wal, fileName, fileId); if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get wal info", vgId);
return -1;
}
int32_t code = walGetWalFile(pVnode->wal, fileName, fileId);
vnodeRelease(pVnode);
return code;
} }
static void vnodeNotifyRole(void *ahandle, int8_t role) { static void vnodeNotifyRole(int32_t vgId, int8_t role) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify role", vgId);
return;
}
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]); vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
pVnode->role = role; pVnode->role = role;
dnodeSendStatusMsgToMnode(); dnodeSendStatusMsgToMnode();
...@@ -648,17 +672,26 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -648,17 +672,26 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
} else { } else {
cqStop(pVnode->cq); cqStop(pVnode->cq);
} }
vnodeRelease(pVnode);
} }
static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) { static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while ctrl flow", vgId);
return;
}
if (pVnode->delayMs != mseconds) { if (pVnode->delayMs != mseconds) {
pVnode->delayMs = mseconds; pVnode->delayMs = mseconds;
vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
} }
vnodeRelease(pVnode);
} }
static int vnodeResetTsdb(SVnodeObj *pVnode) { static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
...@@ -672,7 +705,7 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { ...@@ -672,7 +705,7 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
// acquire vnode // acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2) { if (refCount > 3) {
tsem_wait(&pVnode->sem); tsem_wait(&pVnode->sem);
} }
...@@ -692,14 +725,44 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { ...@@ -692,14 +725,44 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
return 0; return 0;
} }
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify file synced", vgId);
return 0;
}
pVnode->fversion = fversion; pVnode->fversion = fversion;
pVnode->version = fversion; pVnode->version = fversion;
vnodeSaveVersion(pVnode); vnodeSaveVersion(pVnode);
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
pVnode->version); int32_t code = vnodeResetTsdb(pVnode);
return vnodeResetTsdb(pVnode);
vnodeRelease(pVnode);
return code;
}
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
void *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while confirm forward", vgId);
return;
}
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
vnodeRelease(pVnode);
}
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam);
vnodeRelease(pVnode);
return code;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册