提交 43063fd8 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-1413

......@@ -33,7 +33,7 @@ static void dnodePrintEps(SDnodeEps *eps);
int32_t dnodeInitEps() {
pthread_mutex_init(&tsEpsMutex, NULL);
tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
dnodeResetEps(NULL);
int32_t ret = dnodeReadEps();
if (ret == 0) {
......
......@@ -64,33 +64,32 @@ typedef struct {
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
*/
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
typedef uint32_t (*FGetFileInfo)(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
typedef int32_t (*FWriteToCache)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
// when role is changed, call this to notify app
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
// if a number of retrieving data failed, call this to start flow control
typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds);
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds);
// when data file is synced successfully, notity app
typedef int32_t (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
typedef struct {
int32_t vgId; // vgroup ID
uint64_t version; // initial version
SSyncCfg syncCfg; // configuration from mgmt
char path[128]; // path to the file
void * ahandle; // handle provided by APP
char path[TSDB_FILENAME_LEN]; // path to the file
FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo;
FWriteToCache writeToCache;
......
......@@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue;
static SSdbWorkerPool tsSdbPool;
static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused);
static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam);
static int32_t sdbWriteWalToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam);
static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action);
static void sdbFreeFromQueue(SSdbRow *pRow);
static void * sdbWorkerFp(void *pWorker);
......@@ -228,16 +228,16 @@ void sdbUpdateMnodeRoles() {
mnodeUpdateMnodeEpSet();
}
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
sdbUpdateMnodeRoles();
return 0;
}
static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) {
static int32_t sdbGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
return walGetWalFile(tsSdbMgmt.wal, fileName, fileId);
}
static void sdbNotifyRole(void *ahandle, int8_t role) {
static void sdbNotifyRole(int32_t vgId, int8_t role) {
sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]);
if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) {
......@@ -264,7 +264,7 @@ static void sdbHandleFailedConfirm(SSdbRow *pRow) {
}
FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) {
static void sdbConfirmForward(int32_t vgId, void *wparam, int32_t code) {
if (wparam == NULL) return;
SSdbRow *pRow = wparam;
SMnodeMsg * pMsg = pRow->pMsg;
......@@ -370,7 +370,6 @@ void sdbUpdateSync(void *pMnodes) {
syncInfo.version = sdbGetVersion();
syncInfo.syncCfg = syncCfg;
sprintf(syncInfo.path, "%s", tsMnodeDir);
syncInfo.ahandle = NULL;
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.writeToCache = sdbWriteWalToQueue;
......@@ -813,7 +812,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
}
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true, true);
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true, HASH_ENTRY_LOCK);
tsSdbMgmt.numOfTables++;
tsSdbMgmt.tableList[pTable->id] = pTable;
......@@ -967,7 +966,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) {
taosFreeQitem(pRow);
}
static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
static int32_t sdbWriteWalToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
SWalHead *pHead = wparam;
int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len;
......@@ -1039,7 +1038,7 @@ static void *sdbWorkerFp(void *pWorker) {
taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow);
if (qtype == TAOS_QTYPE_RPC) {
sdbConfirmForward(NULL, pRow, pRow->code);
sdbConfirmForward(1, pRow, pRow->code);
} else {
if (qtype == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code);
......
......@@ -394,7 +394,7 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
atomic_add_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
if (pStable->vgHash != NULL) {
......@@ -413,7 +413,7 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
if (pVgroup == NULL) {
taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId));
taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash));
}
......
......@@ -153,13 +153,12 @@ typedef struct SSyncNode {
int8_t selfIndex;
uint32_t vgId;
int64_t rid;
void *ahandle;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
SSyncPeer *pMaster;
SSyncPeer * peerInfo[TAOS_SYNC_MAX_REPLICA + 1]; // extra one for arbitrator
SSyncPeer * pMaster;
SRecvBuffer *pRecv;
SSyncFwds *pSyncFwds; // saved forward info if quorum >1
void *pFwdTimer;
void *pRoleTimer;
SSyncFwds * pSyncFwds; // saved forward info if quorum >1
void * pFwdTimer;
void * pRoleTimer;
FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo;
FWriteToCache writeToCache;
......
......@@ -100,7 +100,7 @@ uint16_t syncGenTranId() {
}
int32_t syncInit() {
SPoolInfo info;
SPoolInfo info = {0};
info.numOfThreads = tsSyncTcpThreads;
info.serverIp = 0;
......@@ -124,7 +124,7 @@ int32_t syncInit() {
return -1;
}
tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVgIdHash == NULL) {
sError("failed to init tsVgIdHash");
taosTmrCleanUp(tsSyncTmrCtrl);
......@@ -181,7 +181,6 @@ int64_t syncStart(const SSyncInfo *pInfo) {
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
pthread_mutex_init(&pNode->mutex, NULL);
pNode->ahandle = pInfo->ahandle;
pNode->getFileInfo = pInfo->getFileInfo;
pNode->getWalInfo = pInfo->getWalInfo;
pNode->writeToCache = pInfo->writeToCache;
......@@ -202,10 +201,10 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return -1;
}
for (int32_t i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
if (pNode->peerInfo[i] == NULL) {
for (int32_t index = 0; index < pCfg->replica; ++index) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + index;
pNode->peerInfo[index] = syncAddPeer(pNode, pNodeInfo);
if (pNode->peerInfo[index] == NULL) {
sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId,
pNodeInfo->nodeFqdn, pNodeInfo->nodePort);
syncStop(pNode->rid);
......@@ -213,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
}
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
pNode->selfIndex = i;
pNode->selfIndex = index;
}
}
......@@ -252,10 +251,10 @@ int64_t syncStart(const SSyncInfo *pInfo) {
}
syncAddArbitrator(pNode);
taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
taosHashPut(tsVgIdHash, &pNode->vgId, sizeof(int32_t), &pNode, sizeof(SSyncNode *));
if (pNode->notifyRole) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
}
return pNode->rid;
......@@ -269,14 +268,14 @@ void syncStop(int64_t rid) {
sInfo("vgId:%d, cleanup sync", pNode->vgId);
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
if (pNode->pRoleTimer) taosTmrStop(pNode->pRoleTimer);
for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
for (int32_t index = 0; index < pNode->replica; ++index) {
pPeer = pNode->peerInfo[index];
if (pPeer) syncRemovePeer(pPeer);
}
......@@ -298,7 +297,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica);
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
for (i = 0; i < pNode->replica; ++i) {
for (j = 0; j < pNewCfg->replica; ++j) {
......@@ -348,7 +347,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
if (pNewCfg->replica <= 1) {
sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId);
nodeRole = TAOS_SYNC_ROLE_MASTER;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
}
pthread_mutex_unlock(&(pNode->mutex));
......@@ -412,10 +411,10 @@ void syncRecover(int64_t rid) {
// if take this node to unsync state, the whole system may not work
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
nodeVersion = 0;
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
......@@ -568,7 +567,7 @@ static void syncResetFlowCtrl(SSyncNode *pNode) {
}
if (pNode->notifyFlowCtrl) {
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
(*pNode->notifyFlowCtrl)(pNode->vgId, 0);
}
}
......@@ -631,7 +630,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
}
#endif
syncResetFlowCtrl(pNode);
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
} else {
pPeer = pNode->peerInfo[index];
sInfo("%s, it shall work as master", pPeer->id);
......@@ -662,7 +661,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
}
} else {
......@@ -675,7 +674,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if (masterIndex == pNode->selfIndex) {
sError("%s, peer is master, work as slave instead", pTemp->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
}
}
}
......@@ -692,7 +691,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
code = -1;
for (int32_t index = 0; index < pNode->replica; ++index) {
......@@ -729,7 +728,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
} else {
sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
}
} else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
......@@ -832,7 +831,7 @@ static void syncNotStarted(void *param, void *tmrId) {
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
pPeer->timer = NULL;
sInfo("%s, sync connection is still not up, restart", pPeer->id);
syncRestartConnection(pPeer);
......@@ -843,7 +842,7 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
syncRecoverFromMaster(pPeer);
pthread_mutex_unlock(&(pNode->mutex));
}
......@@ -913,7 +912,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
(*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
......@@ -969,7 +968,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
char * cont = buffer;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
int32_t code = syncReadPeerMsg(pPeer, &head, cont);
......@@ -1067,7 +1066,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
sDebug("%s, check peer connection", pPeer->id);
syncSetupPeerConnection(pPeer);
......@@ -1119,7 +1118,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
}
SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) {
......@@ -1157,7 +1156,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return;
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno));
pPeer->peerFd = -1;
......@@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
if (confirm && pFwdInfo->confirmed == 0) {
sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
(*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code);
(*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code);
pFwdInfo->confirmed = 1;
}
}
......@@ -1263,7 +1262,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (ABS(time - pFwdInfo->time) < 2000) break;
......@@ -1321,7 +1320,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex));
pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
......
......@@ -38,7 +38,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
while (1) {
name[0] = 0;
magic = (*pNode->getFileInfo)(pNode->ahandle, name, &index, eindex, &size, &fversion);
magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion);
if (magic == 0) break;
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
......@@ -84,7 +84,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// check the file info
sinfo = minfo;
sDebug("%s, get file info:%s", pPeer->id, minfo.name);
sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
&sinfo.fversion);
// if file not there or magic is not the same, file shall be synced
......@@ -164,7 +164,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
}
lastVer = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
(*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL);
}
if (code < 0) {
......@@ -179,7 +179,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
(*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
offset += pHead->len + sizeof(SWalHead);
return offset;
......@@ -276,7 +276,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
// if code > 0, data file is changed, notify app, and pass the version
if (code > 0 && pNode->notifyFileSynced) {
if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) {
if ((*pNode->notifyFileSynced)(pNode->vgId, fversion) < 0) {
sError("%s, app not in ready state", pPeer->id);
return -1;
}
......@@ -307,7 +307,7 @@ void *syncRestoreData(void *param) {
taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1);
(*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING);
(*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
if (syncOpenRecvBuffer(pNode) < 0) {
sError("%s, failed to allocate recv buffer, restart connection", pPeer->id);
......@@ -324,7 +324,7 @@ void *syncRestoreData(void *param) {
}
}
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
nodeSStatus = TAOS_SYNC_STATUS_INIT;
taosClose(pPeer->syncFd);
......
......@@ -108,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
while (1) {
// retrieve file info
fileInfo.name[0] = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion);
// fileInfo.size = htonl(size);
......@@ -354,7 +354,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
index++;
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break;
if (wname[0] == 0) {
code = 0;
......@@ -382,7 +382,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
while (1) {
// retrieve wal info
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break; // error
if (wname[0] == 0) { // no wal file
sDebug("%s, no wal file", pPeer->id);
......@@ -487,10 +487,10 @@ void *syncRetrieveData(void *param) {
// if file is changed 3 times continuously, start flow control
pPeer->numOfRetrieves++;
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
(*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2));
(*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2));
} else {
pPeer->numOfRetrieves = 0;
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
}
pPeer->fileChanged = 0;
......
......@@ -70,7 +70,7 @@ int writeIntoWal(SWalHead *pHead) {
return 0;
}
void confirmForward(void *ahandle, void *mhandle, int32_t code) {
void confirmForward(int32_t vgId, void *mhandle, int32_t code) {
SRpcMsg * pMsg = (SRpcMsg *)mhandle;
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
......@@ -227,7 +227,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
}
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
uint32_t getFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
uint32_t magic;
struct stat fstat;
char aname[280];
......@@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
return magic;
}
int getWalInfo(void *ahandle, char *name, int64_t *index) {
int getWalInfo(int32_t vgId, char *name, int64_t *index) {
struct stat fstat;
char aname[280];
......@@ -272,7 +272,7 @@ int getWalInfo(void *ahandle, char *name, int64_t *index) {
return 1;
}
int writeToCache(void *ahandle, void *data, int type) {
int writeToCache(int32_t vgId, void *data, int type) {
SWalHead *pHead = data;
uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
......@@ -285,9 +285,9 @@ int writeToCache(void *ahandle, void *data, int type) {
return 0;
}
void confirmFwd(void *ahandle, int64_t version) { return; }
void confirmFwd(int32_t vgId, int64_t version) { return; }
void notifyRole(void *ahandle, int8_t r) {
void notifyRole(int32_t vgId, int8_t r) {
role = r;
printf("current role:%s\n", syncRole[role]);
}
......@@ -296,7 +296,6 @@ void initSync() {
pCfg->replica = 1;
pCfg->quorum = 1;
syncInfo.vgId = 1;
syncInfo.ahandle = &syncInfo;
syncInfo.getFileInfo = getFileInfo;
syncInfo.getWalInfo = getWalInfo;
syncInfo.writeToCache = writeToCache;
......
......@@ -33,19 +33,21 @@
static SHashObj*tsVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status, int eno);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(int32_t vgId, int8_t role);
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
#ifndef _SYNC
int64_t syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; }
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype) { return 0; }
void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; }
int32_t syncReconfig(int64_t rid, const SSyncCfg *cfg) { return 0; }
int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; }
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif
......@@ -58,13 +60,13 @@ char* vnodeStatus[] = {
};
int32_t vnodeInitResources() {
int code = syncInit();
int32_t code = syncInit();
if (code != 0) return code;
vnodeInitWriteFp();
vnodeInitReadFp();
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodesHash == NULL) {
vError("failed to init vnode list");
return TSDB_CODE_VND_OUT_OF_MEMORY;
......@@ -163,8 +165,8 @@ int32_t vnodeDrop(int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeAlter(void *param, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = param;
int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
......@@ -315,16 +317,28 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
walRemoveAllOldFiles(pVnode->wal);
walRenew(pVnode->wal);
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
if (pVnode->qMgmt == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
pVnode->events = NULL;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
SSyncInfo syncInfo;
syncInfo.vgId = pVnode->vgId;
syncInfo.version = pVnode->version;
syncInfo.syncCfg = pVnode->syncCfg;
sprintf(syncInfo.path, "%s", rootDir);
syncInfo.ahandle = pVnode;
syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToWQueue;
syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
syncInfo.writeToCache = vnodeWriteToCache;
syncInfo.confirmForward = vnodeConfirmForard;
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
......@@ -336,24 +350,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
return terrno;
}
#endif
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
if (pVnode->qMgmt == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
#endif
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
return TSDB_CODE_SUCCESS;
}
......@@ -379,7 +382,7 @@ void vnodeRelease(void *vparam) {
assert(refCount >= 0);
if (refCount > 0) {
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) {
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount <= 3) {
tsem_post(&pVnode->sem);
}
return;
......@@ -557,11 +560,11 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed
taosHashRemove(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t));
if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait
int i = 0;
int32_t i = 0;
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) !=
TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) {
......@@ -585,7 +588,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
}
// TODO: this is a simple implement
static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
SVnodeObj *pVnode = arg;
if (eno != TSDB_CODE_SUCCESS) {
......@@ -598,37 +601,59 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (pVnode->status == TAOS_VN_STATUS_INIT) {
return 0;
} else {
if (pVnode->status != TAOS_VN_STATUS_INIT) {
return walRenew(pVnode->wal);
}
return 0;
}
if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isFull = 0;
walRemoveOneOldFile(pVnode->wal);
if (pVnode->status != TAOS_VN_STATUS_INIT) {
walRemoveOneOldFile(pVnode->wal);
}
return vnodeSaveVersion(pVnode);
}
return 0;
}
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size,
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size,
uint64_t *fversion) {
SVnodeObj *pVnode = ahandle;
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get file info", vgId);
return 0;
}
*fversion = pVnode->fversion;
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
vnodeRelease(pVnode);
return ret;
}
static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) {
SVnodeObj *pVnode = ahandle;
return walGetWalFile(pVnode->wal, fileName, fileId);
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get wal info", vgId);
return -1;
}
int32_t code = walGetWalFile(pVnode->wal, fileName, fileId);
vnodeRelease(pVnode);
return code;
}
static void vnodeNotifyRole(void *ahandle, int8_t role) {
SVnodeObj *pVnode = ahandle;
static void vnodeNotifyRole(int32_t vgId, int8_t role) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify role", vgId);
return;
}
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
pVnode->role = role;
dnodeSendStatusMsgToMnode();
......@@ -638,17 +663,26 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
} else {
cqStop(pVnode->cq);
}
vnodeRelease(pVnode);
}
static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) {
SVnodeObj *pVnode = ahandle;
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while ctrl flow", vgId);
return;
}
if (pVnode->delayMs != mseconds) {
pVnode->delayMs = mseconds;
vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
}
vnodeRelease(pVnode);
}
static int vnodeResetTsdb(SVnodeObj *pVnode) {
static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
......@@ -658,11 +692,11 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
// acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2) {
if (refCount > 3) {
tsem_wait(&pVnode->sem);
}
......@@ -682,14 +716,44 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
return 0;
}
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
SVnodeObj *pVnode = ahandle;
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify file synced", vgId);
return 0;
}
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion,
pVnode->version);
return vnodeResetTsdb(pVnode);
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
int32_t code = vnodeResetTsdb(pVnode);
vnodeRelease(pVnode);
return code;
}
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
void *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while confirm forward", vgId);
return;
}
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
vnodeRelease(pVnode);
}
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam);
vnodeRelease(pVnode);
return code;
}
......@@ -53,8 +53,7 @@ int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pRead);
}
static int32_t vnodeCheckRead(void *vparam) {
SVnodeObj *pVnode = vparam;
static int32_t vnodeCheckRead(SVnodeObj *pVnode) {
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
......
......@@ -48,10 +48,10 @@ void vnodeInitWriteFp(void) {
}
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0;
SVnodeObj * pVnode = vparam;
SWalHead * pHead = wparam;
SRspRet * pRspRet = rparam;
int32_t code = 0;
SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam;
SRspRet * pRspRet = rparam;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册