diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 967b254992f339ece749757584611bf061b9d7f4..10b2e3f66ad71f3b1fb9d0de39497b41f2bb1977 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -64,33 +64,32 @@ typedef struct { if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. */ -typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); +typedef uint32_t (*FGetFileInfo)(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file -typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId); +typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId); // when a forward pkt is received, call this to handle data -typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +typedef int32_t (*FWriteToCache)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg); // when forward is confirmed by peer, master call this API to notify app -typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); +typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code); // when role is changed, call this to notify app -typedef void (*FNotifyRole)(void *ahandle, int8_t role); +typedef void (*FNotifyRole)(int32_t vgId, int8_t role); // if a number of retrieving data failed, call this to start flow control -typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds); +typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds); // when data file is synced successfully, notity app -typedef int32_t (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); +typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); typedef struct { int32_t vgId; // vgroup ID uint64_t version; // initial version SSyncCfg syncCfg; // configuration from mgmt char path[128]; // path to the file - void * ahandle; // handle provided by APP FGetFileInfo getFileInfo; FGetWalInfo getWalInfo; FWriteToCache writeToCache; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 1728bb7e3bc6587bc28f1cefd1b4e3c56c5ff4d5..e5df1d732a5f655a737480675ca70c9d8a022e3a 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue; static SSdbWorkerPool tsSdbPool; static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused); -static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam); +static int32_t sdbWriteWalToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam); static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action); static void sdbFreeFromQueue(SSdbRow *pRow); static void * sdbWorkerFp(void *pWorker); @@ -228,16 +228,16 @@ void sdbUpdateMnodeRoles() { mnodeUpdateMnodeEpSet(); } -static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { +static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { sdbUpdateMnodeRoles(); return 0; } -static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { +static int32_t sdbGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { return walGetWalFile(tsSdbMgmt.wal, fileName, fileId); } -static void sdbNotifyRole(void *ahandle, int8_t role) { +static void sdbNotifyRole(int32_t vgId, int8_t role) { sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]); if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) { @@ -264,7 +264,7 @@ static void sdbHandleFailedConfirm(SSdbRow *pRow) { } FORCE_INLINE -static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { +static void sdbConfirmForward(int32_t vgId, void *wparam, int32_t code) { if (wparam == NULL) return; SSdbRow *pRow = wparam; SMnodeMsg * pMsg = pRow->pMsg; @@ -370,7 +370,6 @@ void sdbUpdateSync(void *pMnodes) { syncInfo.version = sdbGetVersion(); syncInfo.syncCfg = syncCfg; sprintf(syncInfo.path, "%s", tsMnodeDir); - syncInfo.ahandle = NULL; syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.writeToCache = sdbWriteWalToQueue; @@ -967,7 +966,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) { taosFreeQitem(pRow); } -static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { +static int32_t sdbWriteWalToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { SWalHead *pHead = wparam; int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len; @@ -1039,7 +1038,7 @@ static void *sdbWorkerFp(void *pWorker) { taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow); if (qtype == TAOS_QTYPE_RPC) { - sdbConfirmForward(NULL, pRow, pRow->code); + sdbConfirmForward(1, pRow, pRow->code); } else { if (qtype == TAOS_QTYPE_FWD) { syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code); diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 7d846ebc80e6ccebdddfd6f4a642c8166de5b68c..f1371bdc59ba2b1e5fb52c11eac5d325d88373c0 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -153,7 +153,6 @@ typedef struct SSyncNode { int8_t selfIndex; uint32_t vgId; int64_t rid; - void *ahandle; SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator SSyncPeer *pMaster; SRecvBuffer *pRecv; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 843de9461fd956f0602a0d0dce9aca7fb219d42f..de0fc8fadc7979b6864ebd68322406fec01402e7 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -181,7 +181,6 @@ int64_t syncStart(const SSyncInfo *pInfo) { tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); pthread_mutex_init(&pNode->mutex, NULL); - pNode->ahandle = pInfo->ahandle; pNode->getFileInfo = pInfo->getFileInfo; pNode->getWalInfo = pInfo->getWalInfo; pNode->writeToCache = pInfo->writeToCache; @@ -255,7 +254,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); if (pNode->notifyRole) { - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } return pNode->rid; @@ -348,7 +347,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { if (pNewCfg->replica <= 1) { sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } pthread_mutex_unlock(&(pNode->mutex)); @@ -412,7 +411,7 @@ void syncRecover(int64_t rid) { // if take this node to unsync state, the whole system may not work nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); nodeVersion = 0; pthread_mutex_lock(&(pNode->mutex)); @@ -568,7 +567,7 @@ static void syncResetFlowCtrl(SSyncNode *pNode) { } if (pNode->notifyFlowCtrl) { - (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + (*pNode->notifyFlowCtrl)(pNode->vgId, 0); } } @@ -631,7 +630,7 @@ static void syncChooseMaster(SSyncNode *pNode) { } #endif syncResetFlowCtrl(pNode); - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } else { pPeer = pNode->peerInfo[index]; sInfo("%s, it shall work as master", pPeer->id); @@ -662,7 +661,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { if (onlineNum <= replica * 0.5) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } } else { @@ -675,7 +674,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { if (masterIndex == pNode->selfIndex) { sError("%s, peer is master, work as slave instead", pTemp->id); nodeRole = TAOS_SYNC_ROLE_SLAVE; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } } } @@ -692,7 +691,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version); nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); code = -1; for (int32_t index = 0; index < pNode->replica; ++index) { @@ -729,7 +728,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new } else { sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); nodeRole = TAOS_SYNC_ROLE_SLAVE; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); @@ -913,7 +912,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); + (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); } else { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); @@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code if (confirm && pFwdInfo->confirmed == 0) { sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); - (*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code); + (*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code); pFwdInfo->confirmed = 1; } } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index cc2315fb15c6845a763a95928f5e66b13bbbca40..b31ec5f7c74b03258ef1a638c890b9886dee8def 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -38,7 +38,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex while (1) { name[0] = 0; - magic = (*pNode->getFileInfo)(pNode->ahandle, name, &index, eindex, &size, &fversion); + magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion); if (magic == 0) break; snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name); @@ -84,7 +84,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { // check the file info sinfo = minfo; sDebug("%s, get file info:%s", pPeer->id, minfo.name); - sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, + sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion); // if file not there or magic is not the same, file shall be synced @@ -164,7 +164,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { } lastVer = pHead->version; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL); + (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL); } if (code < 0) { @@ -179,7 +179,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { SSyncNode *pNode = pPeer->pSyncNode; SWalHead * pHead = (SWalHead *)offset; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); + (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); offset += pHead->len + sizeof(SWalHead); return offset; @@ -276,7 +276,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { // if code > 0, data file is changed, notify app, and pass the version if (code > 0 && pNode->notifyFileSynced) { - if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) { + if ((*pNode->notifyFileSynced)(pNode->vgId, fversion) < 0) { sError("%s, app not in ready state", pPeer->id); return -1; } @@ -307,7 +307,7 @@ void *syncRestoreData(void *param) { taosBlockSIGPIPE(); __sync_fetch_and_add(&tsSyncNum, 1); - (*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING); + (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING); if (syncOpenRecvBuffer(pNode) < 0) { sError("%s, failed to allocate recv buffer, restart connection", pPeer->id); @@ -324,7 +324,7 @@ void *syncRestoreData(void *param) { } } - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); nodeSStatus = TAOS_SYNC_STATUS_INIT; taosClose(pPeer->syncFd); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 82f40854e8fc85632274f4566450963c03e40680..7c1286f419a2deb980dac5a79c9dd067295ff649 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -108,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { while (1) { // retrieve file info fileInfo.name[0] = 0; - fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, + fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); // fileInfo.size = htonl(size); @@ -354,7 +354,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) index++; wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); + code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); if (code < 0) break; if (wname[0] == 0) { code = 0; @@ -382,7 +382,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { while (1) { // retrieve wal info wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); + code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); if (code < 0) break; // error if (wname[0] == 0) { // no wal file sDebug("%s, no wal file", pPeer->id); @@ -487,10 +487,10 @@ void *syncRetrieveData(void *param) { // if file is changed 3 times continuously, start flow control pPeer->numOfRetrieves++; if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2)); + (*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2)); } else { pPeer->numOfRetrieves = 0; - if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); } pPeer->fileChanged = 0; diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 9dd3feb4614105c13ba6a2d1069931345167b472..5a64a0a36a9c5e0557e78216ba41a48a23cc874b 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -70,7 +70,7 @@ int writeIntoWal(SWalHead *pHead) { return 0; } -void confirmForward(void *ahandle, void *mhandle, int32_t code) { +void confirmForward(int32_t vgId, void *mhandle, int32_t code) { SRpcMsg * pMsg = (SRpcMsg *)mhandle; SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); @@ -227,7 +227,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); } -uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { +uint32_t getFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { uint32_t magic; struct stat fstat; char aname[280]; @@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex return magic; } -int getWalInfo(void *ahandle, char *name, int64_t *index) { +int getWalInfo(int32_t vgId, char *name, int64_t *index) { struct stat fstat; char aname[280]; @@ -272,7 +272,7 @@ int getWalInfo(void *ahandle, char *name, int64_t *index) { return 1; } -int writeToCache(void *ahandle, void *data, int type) { +int writeToCache(int32_t vgId, void *data, int type) { SWalHead *pHead = data; uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); @@ -285,9 +285,9 @@ int writeToCache(void *ahandle, void *data, int type) { return 0; } -void confirmFwd(void *ahandle, int64_t version) { return; } +void confirmFwd(int32_t vgId, int64_t version) { return; } -void notifyRole(void *ahandle, int8_t r) { +void notifyRole(int32_t vgId, int8_t r) { role = r; printf("current role:%s\n", syncRole[role]); } @@ -296,7 +296,6 @@ void initSync() { pCfg->replica = 1; pCfg->quorum = 1; syncInfo.vgId = 1; - syncInfo.ahandle = &syncInfo; syncInfo.getFileInfo = getFileInfo; syncInfo.getWalInfo = getWalInfo; syncInfo.writeToCache = writeToCache; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3f20efb150a8607a39fb7df11f387a46bc97f011..50744d59bc51326988b50e38ba1bd52997846f47 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,19 +30,21 @@ static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); -static int vnodeProcessTsdbStatus(void *arg, int status, int eno); -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); -static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId); -static void vnodeNotifyRole(void *ahandle, int8_t role); -static void vnodeCtrlFlow(void *handle, int32_t mseconds); -static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); +static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); +static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); +static void vnodeNotifyRole(int32_t vgId, int8_t role); +static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds); +static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); +static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); +static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); #ifndef _SYNC int64_t syncStart(const SSyncInfo *info) { return NULL; } -int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; } +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype) { return 0; } void syncStop(int64_t rid) {} -int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; } -int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; } +int32_t syncReconfig(int64_t rid, const SSyncCfg *cfg) { return 0; } +int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; } void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {} #endif @@ -55,7 +57,7 @@ char* vnodeStatus[] = { }; int32_t vnodeInitResources() { - int code = syncInit(); + int32_t code = syncInit(); if (code != 0) return code; vnodeInitWriteFp(); @@ -325,16 +327,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { walRemoveAllOldFiles(pVnode->wal); walRenew(pVnode->wal); + pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); + if (pVnode->qMgmt == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } + + pVnode->events = NULL; + pVnode->status = TAOS_VN_STATUS_READY; + + vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); + tsdbIncCommitRef(pVnode->vgId); + + taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); + SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; syncInfo.version = pVnode->version; syncInfo.syncCfg = pVnode->syncCfg; sprintf(syncInfo.path, "%s", rootDir); - syncInfo.ahandle = pVnode; syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; - syncInfo.writeToCache = vnodeWriteToWQueue; - syncInfo.confirmForward = dnodeSendRpcVWriteRsp; + syncInfo.writeToCache = vnodeWriteToCache; + syncInfo.confirmForward = vnodeConfirmForard; syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; @@ -346,23 +361,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); + vnodeRelease(pVnode); vnodeCleanUp(pVnode); return terrno; } -#endif - - pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); - if (pVnode->qMgmt == NULL) { - vnodeCleanUp(pVnode); - return terrno; - } - - pVnode->events = NULL; - pVnode->status = TAOS_VN_STATUS_READY; - vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); - - tsdbIncCommitRef(pVnode->vgId); - taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); +#endif return TSDB_CODE_SUCCESS; } @@ -389,7 +392,7 @@ void vnodeRelease(void *vparam) { assert(refCount >= 0); if (refCount > 0) { - if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) { + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount <= 3) { tsem_post(&pVnode->sem); } return; @@ -571,7 +574,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { if (pVnode->status != TAOS_VN_STATUS_INIT) { // it may be in updateing or reset state, then it shall wait - int i = 0; + int32_t i = 0; while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { if (++i % 1000 == 0) { @@ -595,7 +598,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // TODO: this is a simple implement -static int vnodeProcessTsdbStatus(void *arg, int status, int eno) { +static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { SVnodeObj *pVnode = arg; if (eno != TSDB_CODE_SUCCESS) { @@ -625,20 +628,41 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) { return 0; } -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, +static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { - SVnodeObj *pVnode = ahandle; + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while get file info", vgId); + return 0; + } + *fversion = pVnode->fversion; - return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); + uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); + + vnodeRelease(pVnode); + return ret; } -static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { - SVnodeObj *pVnode = ahandle; - return walGetWalFile(pVnode->wal, fileName, fileId); +static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while get wal info", vgId); + return -1; + } + + int32_t code = walGetWalFile(pVnode->wal, fileName, fileId); + + vnodeRelease(pVnode); + return code; } -static void vnodeNotifyRole(void *ahandle, int8_t role) { - SVnodeObj *pVnode = ahandle; +static void vnodeNotifyRole(int32_t vgId, int8_t role) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while notify role", vgId); + return; + } + vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]); pVnode->role = role; dnodeSendStatusMsgToMnode(); @@ -648,17 +672,26 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { } else { cqStop(pVnode->cq); } + + vnodeRelease(pVnode); } -static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) { - SVnodeObj *pVnode = ahandle; +static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while ctrl flow", vgId); + return; + } + if (pVnode->delayMs != mseconds) { pVnode->delayMs = mseconds; vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); } + + vnodeRelease(pVnode); } -static int vnodeResetTsdb(SVnodeObj *pVnode) { +static int32_t vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); @@ -672,7 +705,7 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { // acquire vnode int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - if (refCount > 2) { + if (refCount > 3) { tsem_wait(&pVnode->sem); } @@ -692,14 +725,44 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { return 0; } -static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { - SVnodeObj *pVnode = ahandle; +static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while notify file synced", vgId); + return 0; + } pVnode->fversion = fversion; pVnode->version = fversion; vnodeSaveVersion(pVnode); - vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, - pVnode->version); - return vnodeResetTsdb(pVnode); + vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); + int32_t code = vnodeResetTsdb(pVnode); + + vnodeRelease(pVnode); + return code; +} + +void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) { + void *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while confirm forward", vgId); + return; + } + + dnodeSendRpcVWriteRsp(pVnode, wparam, code); + vnodeRelease(pVnode); +} + +static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while write to cache", vgId); + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } + + int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam); + + vnodeRelease(pVnode); + return code; }