diff --git a/src/dnode/src/dnodeEps.c b/src/dnode/src/dnodeEps.c index 9c90c391813f57024ecf090136dfa86f5e5f91e6..83f294e05eb0e559638fef1259ad96ac6f162460 100644 --- a/src/dnode/src/dnodeEps.c +++ b/src/dnode/src/dnodeEps.c @@ -33,7 +33,7 @@ static void dnodePrintEps(SDnodeEps *eps); int32_t dnodeInitEps() { pthread_mutex_init(&tsEpsMutex, NULL); - tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); + tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); dnodeResetEps(NULL); int32_t ret = dnodeReadEps(); if (ret == 0) { diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 967b254992f339ece749757584611bf061b9d7f4..77a3b36e73ec6a151e1f8b23e26673d3113dc012 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -64,33 +64,32 @@ typedef struct { if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. */ -typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); +typedef uint32_t (*FGetFileInfo)(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file -typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId); +typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId); // when a forward pkt is received, call this to handle data -typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +typedef int32_t (*FWriteToCache)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg); // when forward is confirmed by peer, master call this API to notify app -typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); +typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code); // when role is changed, call this to notify app -typedef void (*FNotifyRole)(void *ahandle, int8_t role); +typedef void (*FNotifyRole)(int32_t vgId, int8_t role); // if a number of retrieving data failed, call this to start flow control -typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds); +typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds); // when data file is synced successfully, notity app -typedef int32_t (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); +typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); typedef struct { int32_t vgId; // vgroup ID uint64_t version; // initial version SSyncCfg syncCfg; // configuration from mgmt - char path[128]; // path to the file - void * ahandle; // handle provided by APP + char path[TSDB_FILENAME_LEN]; // path to the file FGetFileInfo getFileInfo; FGetWalInfo getWalInfo; FWriteToCache writeToCache; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 1728bb7e3bc6587bc28f1cefd1b4e3c56c5ff4d5..26a881d4773911e3bf2166ec64645bc130203b0c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue; static SSdbWorkerPool tsSdbPool; static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused); -static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam); +static int32_t sdbWriteWalToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam); static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action); static void sdbFreeFromQueue(SSdbRow *pRow); static void * sdbWorkerFp(void *pWorker); @@ -228,16 +228,16 @@ void sdbUpdateMnodeRoles() { mnodeUpdateMnodeEpSet(); } -static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { +static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { sdbUpdateMnodeRoles(); return 0; } -static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { +static int32_t sdbGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { return walGetWalFile(tsSdbMgmt.wal, fileName, fileId); } -static void sdbNotifyRole(void *ahandle, int8_t role) { +static void sdbNotifyRole(int32_t vgId, int8_t role) { sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]); if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) { @@ -264,7 +264,7 @@ static void sdbHandleFailedConfirm(SSdbRow *pRow) { } FORCE_INLINE -static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { +static void sdbConfirmForward(int32_t vgId, void *wparam, int32_t code) { if (wparam == NULL) return; SSdbRow *pRow = wparam; SMnodeMsg * pMsg = pRow->pMsg; @@ -370,7 +370,6 @@ void sdbUpdateSync(void *pMnodes) { syncInfo.version = sdbGetVersion(); syncInfo.syncCfg = syncCfg; sprintf(syncInfo.path, "%s", tsMnodeDir); - syncInfo.ahandle = NULL; syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.writeToCache = sdbWriteWalToQueue; @@ -813,7 +812,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); } - pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true, true); + pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true, HASH_ENTRY_LOCK); tsSdbMgmt.numOfTables++; tsSdbMgmt.tableList[pTable->id] = pTable; @@ -967,7 +966,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) { taosFreeQitem(pRow); } -static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { +static int32_t sdbWriteWalToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { SWalHead *pHead = wparam; int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len; @@ -1039,7 +1038,7 @@ static void *sdbWorkerFp(void *pWorker) { taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow); if (qtype == TAOS_QTYPE_RPC) { - sdbConfirmForward(NULL, pRow, pRow->code); + sdbConfirmForward(1, pRow, pRow->code); } else { if (qtype == TAOS_QTYPE_FWD) { syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code); diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4d5f0808f55a3592b2ea9833f12df85ed252c9e2..d0a5402986fcc9d24d3c8dce4e07e87af9a2061a 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -394,7 +394,7 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) { atomic_add_fetch_32(&pStable->numOfTables, 1); if (pStable->vgHash == NULL) { - pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); } if (pStable->vgHash != NULL) { @@ -413,7 +413,7 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable) SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId); if (pVgroup == NULL) { - taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId)); + taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)); mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, (int32_t)taosHashGetSize(pStable->vgHash)); } diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 7d846ebc80e6ccebdddfd6f4a642c8166de5b68c..079013499720407508fcbae60164ff08394fa03b 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -153,13 +153,12 @@ typedef struct SSyncNode { int8_t selfIndex; uint32_t vgId; int64_t rid; - void *ahandle; - SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator - SSyncPeer *pMaster; + SSyncPeer * peerInfo[TAOS_SYNC_MAX_REPLICA + 1]; // extra one for arbitrator + SSyncPeer * pMaster; SRecvBuffer *pRecv; - SSyncFwds *pSyncFwds; // saved forward info if quorum >1 - void *pFwdTimer; - void *pRoleTimer; + SSyncFwds * pSyncFwds; // saved forward info if quorum >1 + void * pFwdTimer; + void * pRoleTimer; FGetFileInfo getFileInfo; FGetWalInfo getWalInfo; FWriteToCache writeToCache; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 843de9461fd956f0602a0d0dce9aca7fb219d42f..41d68b3149f8992289233913f59f274b9ee0c566 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -100,7 +100,7 @@ uint16_t syncGenTranId() { } int32_t syncInit() { - SPoolInfo info; + SPoolInfo info = {0}; info.numOfThreads = tsSyncTcpThreads; info.serverIp = 0; @@ -124,7 +124,7 @@ int32_t syncInit() { return -1; } - tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); + tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVgIdHash == NULL) { sError("failed to init tsVgIdHash"); taosTmrCleanUp(tsSyncTmrCtrl); @@ -181,7 +181,6 @@ int64_t syncStart(const SSyncInfo *pInfo) { tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); pthread_mutex_init(&pNode->mutex, NULL); - pNode->ahandle = pInfo->ahandle; pNode->getFileInfo = pInfo->getFileInfo; pNode->getWalInfo = pInfo->getWalInfo; pNode->writeToCache = pInfo->writeToCache; @@ -202,10 +201,10 @@ int64_t syncStart(const SSyncInfo *pInfo) { return -1; } - for (int32_t i = 0; i < pCfg->replica; ++i) { - const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; - pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); - if (pNode->peerInfo[i] == NULL) { + for (int32_t index = 0; index < pCfg->replica; ++index) { + const SNodeInfo *pNodeInfo = pCfg->nodeInfo + index; + pNode->peerInfo[index] = syncAddPeer(pNode, pNodeInfo); + if (pNode->peerInfo[index] == NULL) { sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId, pNodeInfo->nodeFqdn, pNodeInfo->nodePort); syncStop(pNode->rid); @@ -213,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { } if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) { - pNode->selfIndex = i; + pNode->selfIndex = index; } } @@ -252,10 +251,10 @@ int64_t syncStart(const SSyncInfo *pInfo) { } syncAddArbitrator(pNode); - taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); + taosHashPut(tsVgIdHash, &pNode->vgId, sizeof(int32_t), &pNode, sizeof(SSyncNode *)); if (pNode->notifyRole) { - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } return pNode->rid; @@ -269,14 +268,14 @@ void syncStop(int64_t rid) { sInfo("vgId:%d, cleanup sync", pNode->vgId); - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); - if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); + if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t)); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); if (pNode->pRoleTimer) taosTmrStop(pNode->pRoleTimer); - for (int32_t i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; + for (int32_t index = 0; index < pNode->replica; ++index) { + pPeer = pNode->peerInfo[index]; if (pPeer) syncRemovePeer(pPeer); } @@ -298,7 +297,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); for (i = 0; i < pNode->replica; ++i) { for (j = 0; j < pNewCfg->replica; ++j) { @@ -348,7 +347,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { if (pNewCfg->replica <= 1) { sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } pthread_mutex_unlock(&(pNode->mutex)); @@ -412,10 +411,10 @@ void syncRecover(int64_t rid) { // if take this node to unsync state, the whole system may not work nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); nodeVersion = 0; - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; @@ -568,7 +567,7 @@ static void syncResetFlowCtrl(SSyncNode *pNode) { } if (pNode->notifyFlowCtrl) { - (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + (*pNode->notifyFlowCtrl)(pNode->vgId, 0); } } @@ -631,7 +630,7 @@ static void syncChooseMaster(SSyncNode *pNode) { } #endif syncResetFlowCtrl(pNode); - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } else { pPeer = pNode->peerInfo[index]; sInfo("%s, it shall work as master", pPeer->id); @@ -662,7 +661,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { if (onlineNum <= replica * 0.5) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } } else { @@ -675,7 +674,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { if (masterIndex == pNode->selfIndex) { sError("%s, peer is master, work as slave instead", pTemp->id); nodeRole = TAOS_SYNC_ROLE_SLAVE; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } } } @@ -692,7 +691,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version); nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); code = -1; for (int32_t index = 0; index < pNode->replica; ++index) { @@ -729,7 +728,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new } else { sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); nodeRole = TAOS_SYNC_ROLE_SLAVE; - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); } } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); @@ -832,7 +831,7 @@ static void syncNotStarted(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); pPeer->timer = NULL; sInfo("%s, sync connection is still not up, restart", pPeer->id); syncRestartConnection(pPeer); @@ -843,7 +842,7 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); syncRecoverFromMaster(pPeer); pthread_mutex_unlock(&(pNode->mutex)); } @@ -913,7 +912,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); + (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); } else { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); @@ -969,7 +968,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { char * cont = buffer; SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); int32_t code = syncReadPeerMsg(pPeer, &head, cont); @@ -1067,7 +1066,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); sDebug("%s, check peer connection", pPeer->id); syncSetupPeerConnection(pPeer); @@ -1119,7 +1118,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { } SSyncNode *pNode = *ppNode; - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { @@ -1157,7 +1156,7 @@ static void syncProcessBrokenLink(void *param) { SSyncNode *pNode = pPeer->pSyncNode; if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno)); pPeer->peerFd = -1; @@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code if (confirm && pFwdInfo->confirmed == 0) { sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); - (*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code); + (*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code); pFwdInfo->confirmed = 1; } } @@ -1263,7 +1262,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { int64_t time = taosGetTimestampMs(); if (pSyncFwds->fwds > 0) { - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; if (ABS(time - pFwdInfo->time) < 2000) break; @@ -1321,7 +1320,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle pSyncHead->len = sizeof(SWalHead) + pWalHead->len; fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head - pthread_mutex_lock(&(pNode->mutex)); + pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index cc2315fb15c6845a763a95928f5e66b13bbbca40..b31ec5f7c74b03258ef1a638c890b9886dee8def 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -38,7 +38,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex while (1) { name[0] = 0; - magic = (*pNode->getFileInfo)(pNode->ahandle, name, &index, eindex, &size, &fversion); + magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion); if (magic == 0) break; snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name); @@ -84,7 +84,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { // check the file info sinfo = minfo; sDebug("%s, get file info:%s", pPeer->id, minfo.name); - sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, + sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion); // if file not there or magic is not the same, file shall be synced @@ -164,7 +164,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { } lastVer = pHead->version; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL); + (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL); } if (code < 0) { @@ -179,7 +179,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { SSyncNode *pNode = pPeer->pSyncNode; SWalHead * pHead = (SWalHead *)offset; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); + (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); offset += pHead->len + sizeof(SWalHead); return offset; @@ -276,7 +276,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { // if code > 0, data file is changed, notify app, and pass the version if (code > 0 && pNode->notifyFileSynced) { - if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) { + if ((*pNode->notifyFileSynced)(pNode->vgId, fversion) < 0) { sError("%s, app not in ready state", pPeer->id); return -1; } @@ -307,7 +307,7 @@ void *syncRestoreData(void *param) { taosBlockSIGPIPE(); __sync_fetch_and_add(&tsSyncNum, 1); - (*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING); + (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING); if (syncOpenRecvBuffer(pNode) < 0) { sError("%s, failed to allocate recv buffer, restart connection", pPeer->id); @@ -324,7 +324,7 @@ void *syncRestoreData(void *param) { } } - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + (*pNode->notifyRole)(pNode->vgId, nodeRole); nodeSStatus = TAOS_SYNC_STATUS_INIT; taosClose(pPeer->syncFd); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 82f40854e8fc85632274f4566450963c03e40680..7c1286f419a2deb980dac5a79c9dd067295ff649 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -108,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { while (1) { // retrieve file info fileInfo.name[0] = 0; - fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, + fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); // fileInfo.size = htonl(size); @@ -354,7 +354,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) index++; wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); + code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); if (code < 0) break; if (wname[0] == 0) { code = 0; @@ -382,7 +382,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { while (1) { // retrieve wal info wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); + code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); if (code < 0) break; // error if (wname[0] == 0) { // no wal file sDebug("%s, no wal file", pPeer->id); @@ -487,10 +487,10 @@ void *syncRetrieveData(void *param) { // if file is changed 3 times continuously, start flow control pPeer->numOfRetrieves++; if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2)); + (*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2)); } else { pPeer->numOfRetrieves = 0; - if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); } pPeer->fileChanged = 0; diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 9dd3feb4614105c13ba6a2d1069931345167b472..5a64a0a36a9c5e0557e78216ba41a48a23cc874b 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -70,7 +70,7 @@ int writeIntoWal(SWalHead *pHead) { return 0; } -void confirmForward(void *ahandle, void *mhandle, int32_t code) { +void confirmForward(int32_t vgId, void *mhandle, int32_t code) { SRpcMsg * pMsg = (SRpcMsg *)mhandle; SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); @@ -227,7 +227,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); } -uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { +uint32_t getFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { uint32_t magic; struct stat fstat; char aname[280]; @@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex return magic; } -int getWalInfo(void *ahandle, char *name, int64_t *index) { +int getWalInfo(int32_t vgId, char *name, int64_t *index) { struct stat fstat; char aname[280]; @@ -272,7 +272,7 @@ int getWalInfo(void *ahandle, char *name, int64_t *index) { return 1; } -int writeToCache(void *ahandle, void *data, int type) { +int writeToCache(int32_t vgId, void *data, int type) { SWalHead *pHead = data; uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); @@ -285,9 +285,9 @@ int writeToCache(void *ahandle, void *data, int type) { return 0; } -void confirmFwd(void *ahandle, int64_t version) { return; } +void confirmFwd(int32_t vgId, int64_t version) { return; } -void notifyRole(void *ahandle, int8_t r) { +void notifyRole(int32_t vgId, int8_t r) { role = r; printf("current role:%s\n", syncRole[role]); } @@ -296,7 +296,6 @@ void initSync() { pCfg->replica = 1; pCfg->quorum = 1; syncInfo.vgId = 1; - syncInfo.ahandle = &syncInfo; syncInfo.getFileInfo = getFileInfo; syncInfo.getWalInfo = getWalInfo; syncInfo.writeToCache = writeToCache; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3f20efb150a8607a39fb7df11f387a46bc97f011..b94fea52bd4f8fbdd20677bf874684123f03a9d4 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,19 +30,21 @@ static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); -static int vnodeProcessTsdbStatus(void *arg, int status, int eno); -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); -static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId); -static void vnodeNotifyRole(void *ahandle, int8_t role); -static void vnodeCtrlFlow(void *handle, int32_t mseconds); -static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); +static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); +static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); +static void vnodeNotifyRole(int32_t vgId, int8_t role); +static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds); +static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); +static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); +static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); #ifndef _SYNC int64_t syncStart(const SSyncInfo *info) { return NULL; } -int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; } +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype) { return 0; } void syncStop(int64_t rid) {} -int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; } -int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; } +int32_t syncReconfig(int64_t rid, const SSyncCfg *cfg) { return 0; } +int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; } void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {} #endif @@ -55,13 +57,13 @@ char* vnodeStatus[] = { }; int32_t vnodeInitResources() { - int code = syncInit(); + int32_t code = syncInit(); if (code != 0) return code; vnodeInitWriteFp(); vnodeInitReadFp(); - tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); + tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVnodesHash == NULL) { vError("failed to init vnode list"); return TSDB_CODE_VND_OUT_OF_MEMORY; @@ -173,8 +175,8 @@ int32_t vnodeDrop(int32_t vgId) { return TSDB_CODE_SUCCESS; } -int32_t vnodeAlter(void *param, SCreateVnodeMsg *pVnodeCfg) { - SVnodeObj *pVnode = param; +int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) { + SVnodeObj *pVnode = vparam; // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // cfgVersion can be corrected by status msg @@ -325,16 +327,28 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { walRemoveAllOldFiles(pVnode->wal); walRenew(pVnode->wal); + pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); + if (pVnode->qMgmt == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } + + pVnode->events = NULL; + + vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); + tsdbIncCommitRef(pVnode->vgId); + + taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; syncInfo.version = pVnode->version; syncInfo.syncCfg = pVnode->syncCfg; sprintf(syncInfo.path, "%s", rootDir); - syncInfo.ahandle = pVnode; syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; - syncInfo.writeToCache = vnodeWriteToWQueue; - syncInfo.confirmForward = dnodeSendRpcVWriteRsp; + syncInfo.writeToCache = vnodeWriteToCache; + syncInfo.confirmForward = vnodeConfirmForard; syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; @@ -346,24 +360,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); + vnodeRelease(pVnode); vnodeCleanUp(pVnode); return terrno; } -#endif - - pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); - if (pVnode->qMgmt == NULL) { - vnodeCleanUp(pVnode); - return terrno; - } +#endif - pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; - vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); - - tsdbIncCommitRef(pVnode->vgId); - taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); - return TSDB_CODE_SUCCESS; } @@ -389,7 +392,7 @@ void vnodeRelease(void *vparam) { assert(refCount >= 0); if (refCount > 0) { - if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) { + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount <= 3) { tsem_post(&pVnode->sem); } return; @@ -567,11 +570,11 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed - taosHashRemove(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t)); if (pVnode->status != TAOS_VN_STATUS_INIT) { // it may be in updateing or reset state, then it shall wait - int i = 0; + int32_t i = 0; while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { if (++i % 1000 == 0) { @@ -595,7 +598,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // TODO: this is a simple implement -static int vnodeProcessTsdbStatus(void *arg, int status, int eno) { +static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { SVnodeObj *pVnode = arg; if (eno != TSDB_CODE_SUCCESS) { @@ -608,37 +611,59 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) { if (status == TSDB_STATUS_COMMIT_START) { pVnode->fversion = pVnode->version; vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); - if (pVnode->status == TAOS_VN_STATUS_INIT) { - return 0; - } else { + if (pVnode->status != TAOS_VN_STATUS_INIT) { return walRenew(pVnode->wal); } + return 0; } if (status == TSDB_STATUS_COMMIT_OVER) { vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); pVnode->isFull = 0; - walRemoveOneOldFile(pVnode->wal); + if (pVnode->status != TAOS_VN_STATUS_INIT) { + walRemoveOneOldFile(pVnode->wal); + } return vnodeSaveVersion(pVnode); } return 0; } -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, +static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { - SVnodeObj *pVnode = ahandle; + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while get file info", vgId); + return 0; + } + *fversion = pVnode->fversion; - return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); + uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); + + vnodeRelease(pVnode); + return ret; } -static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { - SVnodeObj *pVnode = ahandle; - return walGetWalFile(pVnode->wal, fileName, fileId); +static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while get wal info", vgId); + return -1; + } + + int32_t code = walGetWalFile(pVnode->wal, fileName, fileId); + + vnodeRelease(pVnode); + return code; } -static void vnodeNotifyRole(void *ahandle, int8_t role) { - SVnodeObj *pVnode = ahandle; +static void vnodeNotifyRole(int32_t vgId, int8_t role) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while notify role", vgId); + return; + } + vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]); pVnode->role = role; dnodeSendStatusMsgToMnode(); @@ -648,17 +673,26 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { } else { cqStop(pVnode->cq); } + + vnodeRelease(pVnode); } -static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) { - SVnodeObj *pVnode = ahandle; +static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while ctrl flow", vgId); + return; + } + if (pVnode->delayMs != mseconds) { pVnode->delayMs = mseconds; vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); } + + vnodeRelease(pVnode); } -static int vnodeResetTsdb(SVnodeObj *pVnode) { +static int32_t vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); @@ -668,11 +702,11 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { void *tsdb = pVnode->tsdb; pVnode->tsdb = NULL; - + // acquire vnode int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - if (refCount > 2) { + if (refCount > 3) { tsem_wait(&pVnode->sem); } @@ -692,14 +726,44 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { return 0; } -static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { - SVnodeObj *pVnode = ahandle; +static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while notify file synced", vgId); + return 0; + } pVnode->fversion = fversion; pVnode->version = fversion; vnodeSaveVersion(pVnode); - vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, - pVnode->version); - return vnodeResetTsdb(pVnode); + vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); + int32_t code = vnodeResetTsdb(pVnode); + + vnodeRelease(pVnode); + return code; +} + +void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) { + void *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while confirm forward", vgId); + return; + } + + dnodeSendRpcVWriteRsp(pVnode, wparam, code); + vnodeRelease(pVnode); +} + +static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while write to cache", vgId); + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } + + int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam); + + vnodeRelease(pVnode); + return code; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index e10d62a0c9f283eded9969547fad779468d3cdaa..5ef79cfbf01fb49ebbfd4603b8a9163b3ae9bc32 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -53,8 +53,7 @@ int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pRead); } -static int32_t vnodeCheckRead(void *vparam) { - SVnodeObj *pVnode = vparam; +static int32_t vnodeCheckRead(SVnodeObj *pVnode) { if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], pVnode->refCount, pVnode); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 2d2be602ed217df502101e423613c6631b400b9f..a0227d84babef8ee4259c7d86121770660097020 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -48,10 +48,10 @@ void vnodeInitWriteFp(void) { } int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { - int32_t code = 0; - SVnodeObj * pVnode = vparam; - SWalHead * pHead = wparam; - SRspRet * pRspRet = rparam; + int32_t code = 0; + SVnodeObj *pVnode = vparam; + SWalHead * pHead = wparam; + SRspRet * pRspRet = rparam; if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId,