未验证 提交 5b42f5d5 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3271 from taosdata/master

Merge from master into develop
...@@ -115,7 +115,7 @@ int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); ...@@ -115,7 +115,7 @@ int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
// the TSDB repository info // the TSDB repository info
typedef struct STsdbRepoInfo { typedef struct STsdbRepoInfo {
......
...@@ -64,7 +64,7 @@ typedef struct { ...@@ -64,7 +64,7 @@ typedef struct {
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
*/ */
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
// get the wal file from index or after // get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
......
...@@ -224,7 +224,7 @@ void sdbUpdateMnodeRoles() { ...@@ -224,7 +224,7 @@ void sdbUpdateMnodeRoles() {
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet();
} }
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
return 0; return 0;
} }
......
...@@ -74,7 +74,7 @@ typedef struct { ...@@ -74,7 +74,7 @@ typedef struct {
uint32_t magic; uint32_t magic;
uint32_t index; uint32_t index;
uint64_t fversion; uint64_t fversion;
int32_t size; int64_t size;
} SFileInfo; } SFileInfo;
typedef struct { typedef struct {
......
...@@ -108,8 +108,7 @@ static void syncModuleInitFunc() { ...@@ -108,8 +108,7 @@ static void syncModuleInitFunc() {
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
} }
void *syncStart(const SSyncInfo *pInfo) void *syncStart(const SSyncInfo *pInfo) {
{
const SSyncCfg *pCfg = &pInfo->syncCfg; const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1);
...@@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo) ...@@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo)
return pNode; return pNode;
} }
void syncStop(void *param) void syncStop(void *param) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
if (pNode == NULL) return; if (pNode == NULL) return;
...@@ -215,9 +213,8 @@ void syncStop(void *param) ...@@ -215,9 +213,8 @@ void syncStop(void *param)
syncDecNodeRef(pNode); syncDecNodeRef(pNode);
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param;
int i, j; int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
...@@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) ...@@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg)
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param; SSyncPeer * pPeer;
SSyncPeer *pPeer;
SSyncHead *pSyncHead; SSyncHead *pSyncHead;
SWalHead *pWalHead = data; SWalHead *pWalHead = data;
int fwdLen; int fwdLen;
...@@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) ...@@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype)
return code; return code;
} }
void syncConfirmForward(void *param, uint64_t version, int32_t code) void syncConfirmForward(void *param, uint64_t version, int32_t code) {
{ SSyncNode *pNode = param;
SSyncNode *pNode = param;
if (pNode == NULL) return; if (pNode == NULL) return;
if (pNode->quorum <= 1) return; if (pNode->quorum <= 1) return;
...@@ -387,10 +382,9 @@ void syncRecover(void *param) { ...@@ -387,10 +382,9 @@ void syncRecover(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
{
SSyncNode *pNode = param; SSyncNode *pNode = param;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i=0; i<pNode->replica; ++i) { for (int i=0; i<pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
...@@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) ...@@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole)
return 0; return 0;
} }
static void syncAddArbitrator(SSyncNode *pNode) static void syncAddArbitrator(SSyncNode *pNode) {
{
SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
// if not configured, return right away // if not configured, return right away
...@@ -456,13 +449,11 @@ static void syncDecNodeRef(SSyncNode *pNode) ...@@ -456,13 +449,11 @@ static void syncDecNodeRef(SSyncNode *pNode)
} }
} }
void syncAddPeerRef(SSyncPeer *pPeer) void syncAddPeerRef(SSyncPeer *pPeer) {
{
atomic_add_fetch_8(&pPeer->refCount, 1); atomic_add_fetch_8(&pPeer->refCount, 1);
} }
int syncDecPeerRef(SSyncPeer *pPeer) int syncDecPeerRef(SSyncPeer *pPeer) {
{
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
syncDecNodeRef(pPeer->pSyncNode); syncDecNodeRef(pPeer->pSyncNode);
...@@ -475,18 +466,16 @@ int syncDecPeerRef(SSyncPeer *pPeer) ...@@ -475,18 +466,16 @@ int syncDecPeerRef(SSyncPeer *pPeer)
return 1; return 1;
} }
static void syncClosePeerConn(SSyncPeer *pPeer) static void syncClosePeerConn(SSyncPeer *pPeer) {
{
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >=0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
taosFreeTcpConn(pPeer->pConn); taosFreeTcpConn(pPeer->pConn);
} }
} }
static void syncRemovePeer(SSyncPeer *pPeer) static void syncRemovePeer(SSyncPeer *pPeer) {
{
sInfo("%s, it is removed", pPeer->id); sInfo("%s, it is removed", pPeer->id);
pPeer->ip = 0; pPeer->ip = 0;
...@@ -494,8 +483,7 @@ static void syncRemovePeer(SSyncPeer *pPeer) ...@@ -494,8 +483,7 @@ static void syncRemovePeer(SSyncPeer *pPeer)
syncDecPeerRef(pPeer); syncDecPeerRef(pPeer);
} }
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
{
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL; if (ip == -1) return NULL;
...@@ -525,25 +513,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) ...@@ -525,25 +513,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo)
return pPeer; return pPeer;
} }
void syncBroadcastStatus(SSyncNode *pNode) void syncBroadcastStatus(SSyncNode *pNode) {
{
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
if ( i == pNode->selfIndex ) continue; if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1); syncSendPeersStatusMsgToPeer(pPeer, 1);
} }
} }
static void syncResetFlowCtrl(SSyncNode *pNode) { static void syncResetFlowCtrl(SSyncNode *pNode) {
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pNode->peerInfo[i]->numOfRetrieves = 0; pNode->peerInfo[i]->numOfRetrieves = 0;
} }
if (pNode->notifyFlowCtrl) if (pNode->notifyFlowCtrl) {
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0); (*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
}
} }
static void syncChooseMaster(SSyncNode *pNode) { static void syncChooseMaster(SSyncNode *pNode) {
...@@ -600,9 +587,9 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -600,9 +587,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
} else { } else {
sDebug("vgId:%d, failed to choose master", pNode->vgId); sDebug("vgId:%d, failed to choose master", pNode->vgId);
} }
} }
static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int onlineNum = 0; int onlineNum = 0;
int index = -1; int index = -1;
int replica = pNode->replica; int replica = pNode->replica;
...@@ -619,7 +606,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { ...@@ -619,7 +606,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
replica = pNode->replica + 1; replica = pNode->replica + 1;
} }
if (onlineNum <= replica*0.5) { if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
pNode->peerInfo[pNode->selfIndex]->role = nodeRole; pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
...@@ -627,13 +614,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { ...@@ -627,13 +614,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
} }
} else { } else {
for (int i=0; i<pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i]; SSyncPeer *pTemp = pNode->peerInfo[i];
if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue; if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue;
if ( index < 0 ) { if (index < 0) {
index = i; index = i;
} else { // multiple masters, it shall not happen } else { // multiple masters, it shall not happen
if ( i == pNode->selfIndex ) { if (i == pNode->selfIndex) {
sError("%s, peer is master, work as slave instead", pTemp->id); sError("%s, peer is master, work as slave instead", pTemp->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
...@@ -642,7 +629,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { ...@@ -642,7 +629,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
} }
} }
SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL; SSyncPeer *pMaster = (index >= 0) ? pNode->peerInfo[index] : NULL;
return pMaster; return pMaster;
} }
...@@ -651,7 +638,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { ...@@ -651,7 +638,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
int code = 0; int code = 0;
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
code = -1; code = -1;
...@@ -660,13 +647,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) { ...@@ -660,13 +647,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
if ( i == pNode->selfIndex ) continue; if ( i == pNode->selfIndex ) continue;
syncRestartPeer(pNode->peerInfo[i]); syncRestartPeer(pNode->peerInfo[i]);
} }
} }
return code; return code;
} }
static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int8_t peerOldRole = pPeer->role; int8_t peerOldRole = pPeer->role;
int8_t selfOldRole = nodeRole; int8_t selfOldRole = nodeRole;
...@@ -688,14 +674,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne ...@@ -688,14 +674,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if (syncValidateMaster(pPeer) < 0) return; if (syncValidateMaster(pPeer) < 0) return;
if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) {
if ( nodeVersion < pMaster->version) { if (nodeVersion < pMaster->version) {
syncRequired = 1; syncRequired = 1;
} else { } else {
sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
} else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
// nodeVersion = pMaster->version; // nodeVersion = pMaster->version;
} }
} else { } else {
...@@ -736,20 +722,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) { ...@@ -736,20 +722,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
int ret = strcmp(pPeer->fqdn, tsNodeFqdn); int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) ) if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort))
taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
} }
void syncRestartConnection(SSyncPeer *pPeer) void syncRestartConnection(SSyncPeer *pPeer) {
{
if (pPeer->ip == 0) return; if (pPeer->ip == 0) return;
syncRestartPeer(pPeer); syncRestartPeer(pPeer);
syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE); syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE);
} }
static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
sDebug("%s, sync-req is received", pPeer->id); sDebug("%s, sync-req is received", pPeer->id);
...@@ -784,8 +768,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) ...@@ -784,8 +768,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer)
} }
} }
static void syncNotStarted(void *param, void *tmrId) static void syncNotStarted(void *param, void *tmrId) {
{
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
...@@ -805,14 +788,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { ...@@ -805,14 +788,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
static void syncRecoverFromMaster(SSyncPeer *pPeer) static void syncRecoverFromMaster(SSyncPeer *pPeer) {
{ SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus);
return; return;
} }
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
if (tsSyncNum >= tsMaxSyncNum) { if (tsSyncNum >= tsMaxSyncNum) {
...@@ -842,9 +824,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) ...@@ -842,9 +824,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer)
return; return;
} }
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp *pFwdRsp = (SFwdRsp *) cont; SFwdRsp *pFwdRsp = (SFwdRsp *) cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo *pFwdInfo; SFwdInfo *pFwdInfo;
...@@ -864,10 +845,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) ...@@ -864,10 +845,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer)
} }
} }
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) SSyncNode * pNode = pPeer->pSyncNode;
{
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead *pHead = (SWalHead *)cont; SWalHead *pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
...@@ -886,9 +865,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) ...@@ -886,9 +865,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer)
return; return;
} }
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont; SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
...@@ -911,10 +889,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -911,10 +889,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
} }
// head.len = htonl(head.len); // head.len = htonl(head.len);
if (pHead->len <0) { if (pHead->len < 0) {
sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len);
return -1; return -1;
} }
int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
...@@ -925,9 +903,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -925,9 +903,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return 0; return 0;
} }
static int syncProcessPeerMsg(void *param, void *buffer) static int syncProcessPeerMsg(void *param, void *buffer) {
{ SSyncPeer * pPeer = param;
SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead head;
char *cont = (char *)buffer; char *cont = (char *)buffer;
...@@ -955,8 +932,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) ...@@ -955,8 +932,7 @@ static int syncProcessPeerMsg(void *param, void *buffer)
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA #define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0}; char msg[statusMsgLen] = {0};
...@@ -1013,7 +989,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1013,7 +989,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup", pPeer->id); sDebug("%s, connection to peer server is setup", pPeer->id);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
...@@ -1026,8 +1002,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1026,8 +1002,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
} }
} }
static void syncCheckPeerConnection(void *param, void *tmrId) static void syncCheckPeerConnection(void *param, void *tmrId) {
{
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
...@@ -1039,8 +1014,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) ...@@ -1039,8 +1014,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId)
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
static void syncCreateRestoreDataThread(SSyncPeer *pPeer) static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
{
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
pthread_attr_t thattr; pthread_attr_t thattr;
...@@ -1061,8 +1035,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) ...@@ -1061,8 +1035,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer)
} }
} }
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
{
char ipstr[24]; char ipstr[24];
int i; int i;
...@@ -1139,8 +1112,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1139,8 +1112,7 @@ static void syncProcessBrokenLink(void *param) {
syncDecNodeRef(pNode); syncDecNodeRef(pNode);
} }
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
...@@ -1162,8 +1134,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) ...@@ -1162,8 +1134,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle)
sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
} }
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int fwds = pSyncFwds->fwds; int fwds = pSyncFwds->fwds;
...@@ -1180,8 +1151,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) ...@@ -1180,8 +1151,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode)
} }
} }
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) {
{
int confirm = 0; int confirm = 0;
if (pFwdInfo->code == 0) pFwdInfo->code = code; if (pFwdInfo->code == 0) pFwdInfo->code = code;
...@@ -1202,8 +1172,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1202,8 +1172,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
} }
static void syncMonitorFwdInfos(void *param, void *tmrId) static void syncMonitorFwdInfos(void *param, void *tmrId) {
{
SSyncNode *pNode = param; SSyncNode *pNode = param;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
...@@ -1222,6 +1191,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) ...@@ -1222,6 +1191,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId)
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
} }
...@@ -28,7 +28,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind ...@@ -28,7 +28,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
char fname[TSDB_FILENAME_LEN*3] = {0}; char fname[TSDB_FILENAME_LEN*3] = {0};
uint32_t magic; uint32_t magic;
uint64_t fversion; uint64_t fversion;
int32_t size; int64_t size;
uint32_t index = sindex; uint32_t index = sindex;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
...@@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind ...@@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
} }
} }
static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info
SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info
...@@ -113,7 +112,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) ...@@ -113,7 +112,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
close(dfd); close(dfd);
if (ret<0) break; if (ret<0) break;
sDebug("%s, %s is received, size:%d", pPeer->id, minfo.name, minfo.size); sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
} }
...@@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) ...@@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
return code; return code;
} }
static int syncRestoreWal(SSyncPeer *pPeer) static int syncRestoreWal(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int ret, code = -1; int ret, code = -1;
...@@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) ...@@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
return offset; return offset;
} }
static int syncProcessBufferedFwd(SSyncPeer *pPeer) static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
int forwards = 0; int forwards = 0;
...@@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) ...@@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer)
return pRecv->code; return pRecv->code;
} }
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
...@@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) ...@@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead)
return pRecv->code; return pRecv->code;
} }
static void syncCloseRecvBuffer(SSyncNode *pNode) static void syncCloseRecvBuffer(SSyncNode *pNode) {
{
if (pNode->pRecv) { if (pNode->pRecv) {
taosTFree(pNode->pRecv->buffer); taosTFree(pNode->pRecv->buffer);
} }
...@@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) ...@@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode)
taosTFree(pNode->pRecv); taosTFree(pNode->pRecv);
} }
static int syncOpenRecvBuffer(SSyncNode *pNode) static int syncOpenRecvBuffer(SSyncNode *pNode) {
{
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
...@@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) ...@@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode)
return 0; return 0;
} }
static int syncRestoreDataStepByStep(SSyncPeer *pPeer) static int syncRestoreDataStepByStep(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
nodeSStatus = TAOS_SYNC_STATUS_FILE; nodeSStatus = TAOS_SYNC_STATUS_FILE;
uint64_t fversion = 0; uint64_t fversion = 0;
...@@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) ...@@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer)
return 0; return 0;
} }
void *syncRestoreData(void *param) void *syncRestoreData(void *param) {
{ SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = (SSyncPeer *)param; SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1); __sync_fetch_and_add(&tsSyncNum, 1);
...@@ -326,4 +318,3 @@ void *syncRestoreData(void *param) ...@@ -326,4 +318,3 @@ void *syncRestoreData(void *param)
return NULL; return NULL;
} }
...@@ -27,11 +27,10 @@ ...@@ -27,11 +27,10 @@
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
{
sDebug("%s, start to monitor:%s", pPeer->id, name); sDebug("%s, start to monitor:%s", pPeer->id, name);
if (pPeer->notifyFd <=0) { if (pPeer->notifyFd <= 0) {
pPeer->watchNum = 0; pPeer->watchNum = 0;
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) { if (pPeer->notifyFd < 0) {
...@@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) ...@@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name)
return 0; return 0;
} }
static int syncAreFilesModified(SSyncPeer *pPeer) static int syncAreFilesModified(SSyncPeer *pPeer) {
{ if (pPeer->notifyFd <= 0) return 0;
if (pPeer->notifyFd <=0) return 0;
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
...@@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) ...@@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
} }
} }
return code; return code;
} }
static int syncRetrieveFile(SSyncPeer *pPeer) static int syncRetrieveFile(SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; SFileInfo fileInfo;
SFileAck fileAck; SFileAck fileAck;
int code = -1; int code = -1;
...@@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) ...@@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// wait for the ack from peer // wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
if (ret <0) break; if (ret < 0) break;
// set the peer sync version // set the peer sync version
pPeer->sversion = fileInfo.fversion; pPeer->sversion = fileInfo.fversion;
...@@ -148,13 +145,13 @@ static int syncRetrieveFile(SSyncPeer *pPeer) ...@@ -148,13 +145,13 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// send the file to peer // send the file to peer
int sfd = open(name, O_RDONLY); int sfd = open(name, O_RDONLY);
if ( sfd < 0 ) break; if (sfd < 0) break;
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret <0) break; if (ret < 0) break;
sDebug("%s, %s is sent, size:%d", pPeer->id, name, fileInfo.size); sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
fileInfo.index++; fileInfo.index++;
// check if processed files are modified // check if processed files are modified
...@@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) ...@@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
/* if only a partial record is read out, set the IN_MODIFY flag in event, /* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */ so upper layer will reload the file to get a complete record */
static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) {
{
int ret; int ret;
ret = read(sfd, pHead, sizeof(SWalHead)); ret = read(sfd, pHead, sizeof(SWalHead));
...@@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) ...@@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
} }
ret = read(sfd, pHead->cont, pHead->len); ret = read(sfd, pHead->cont, pHead->len);
if (ret <0) return -1; if (ret < 0) return -1;
if (ret != pHead->len) { if (ret != pHead->len) {
// file is not at end yet, it shall be reloaded // file is not at end yet, it shall be reloaded
...@@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) ...@@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
} }
return sizeof(SWalHead) + pHead->len; return sizeof(SWalHead) + pHead->len;
} }
static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
{
pPeer->watchNum = 0; pPeer->watchNum = 0;
taosClose(pPeer->notifyFd); taosClose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
...@@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) ...@@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name)
return -1; return -1;
} }
return 0; return 0;
} }
static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
{
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len <0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1; return -1;
} }
if (len == 0) return 0; if (len == 0) return 0;
struct inotify_event *event; struct inotify_event *event;
...@@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) ...@@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent)
return 0; return 0;
} }
static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
{
SWalHead *pHead = (SWalHead *) malloc(640000); SWalHead *pHead = (SWalHead *) malloc(640000);
int code = -1; int code = -1;
int32_t bytes = 0; int32_t bytes = 0;
...@@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, ...@@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion);
while (1) { while (1) {
int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); int wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
if (wsize <0) break; if (wsize < 0) break;
if (wsize == 0) { code = 0; break; } if (wsize == 0) {
code = 0;
break;
}
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
...@@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, ...@@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return -1; return -1;
} }
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = -1; int code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
...@@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) ...@@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
} }
if (code < 0) break; if (code < 0) break;
if (pPeer->sversion >= fversion && fversion > 0) break; if (pPeer->sversion >= fversion && fversion > 0) break;
index++; wname[0] = 0; index++;
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
if ( code < 0) break; if (code < 0) break;
if ( wname[0] == 0 ) {code = 0; break;} if (wname[0] == 0) {
code = 0;
break;
}
// current last wal is closed, there is a new one // current last wal is closed, there is a new one
sDebug("%s, last wal is closed, try new one", pPeer->id); sDebug("%s, last wal is closed, try new one", pPeer->id);
...@@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) ...@@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
return code; return code;
} }
static int syncRetrieveWal(SSyncPeer *pPeer) static int syncRetrieveWal(SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
char fname[TSDB_FILENAME_LEN * 3]; char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2]; char wname[TSDB_FILENAME_LEN * 2];
int32_t size; int32_t size;
...@@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) ...@@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
// send wal file, // send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok // inotify is not required, old wal file won't be modified, even remove is ok
if ( stat(fname, &fstat) < 0 ) break; if (stat(fname, &fstat) < 0) break;
size = fstat.st_size; size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
...@@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer) ...@@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
return code; return code;
} }
static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
{ SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt; SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt)); memset(&firstPkt, 0, sizeof(firstPkt));
...@@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) ...@@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
return 0; return 0;
} }
void *syncRetrieveData(void *param) void *syncRetrieveData(void *param) {
{ SSyncPeer * pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
......
...@@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param); ...@@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static SThreadObj *taosGetTcpThread(SPoolObj *pPool);
static void taosStopPoolThread(SThreadObj* pThread); static void taosStopPoolThread(SThreadObj* pThread);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
{
pthread_attr_t thattr; pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
...@@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) ...@@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo)
return pPool; return pPool;
} }
void taosCloseTcpThreadPool(void *param) void taosCloseTcpThreadPool(void *param) {
{
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = (SPoolObj *)param;
SThreadObj *pThread; SThreadObj *pThread;
...@@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param) ...@@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param)
uDebug("%p TCP pool is closed", pPool); uDebug("%p TCP pool is closed", pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
{
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = (SPoolObj *)param;
...@@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) ...@@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd)
return pConn; return pConn;
} }
void taosFreeTcpConn(void *param) void taosFreeTcpConn(void *param) {
{ SConnObj * pConn = (SConnObj *)param;
SConnObj *pConn = (SConnObj *)param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
......
...@@ -150,9 +150,8 @@ static void arbProcessBrokenLink(void *param) { ...@@ -150,9 +150,8 @@ static void arbProcessBrokenLink(void *param) {
taosTFree(pNode); taosTFree(pNode);
} }
static int arbProcessPeerMsg(void *param, void *buffer) static int arbProcessPeerMsg(void *param, void *buffer) {
{ SNodeConn * pNode = param;
SNodeConn *pNode = param;
SSyncHead head; SSyncHead head;
int bytes = 0; int bytes = 0;
char *cont = (char *)buffer; char *cont = (char *)buffer;
...@@ -174,7 +173,6 @@ static int arbProcessPeerMsg(void *param, void *buffer) ...@@ -174,7 +173,6 @@ static int arbProcessPeerMsg(void *param, void *buffer)
} }
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) {
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
...@@ -186,4 +184,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) ...@@ -186,4 +184,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context)
// inform main thread to exit // inform main thread to exit
tsem_post(&tsArbSem); tsem_post(&tsArbSem);
} }
...@@ -234,7 +234,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -234,7 +234,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion)
{ {
uint32_t magic; uint32_t magic;
struct stat fstat; struct stat fstat;
......
...@@ -475,7 +475,7 @@ int tsdbUpdateFileHeader(SFile* pFile); ...@@ -475,7 +475,7 @@ int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int32_t* size); void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
......
...@@ -424,7 +424,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { ...@@ -424,7 +424,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
} }
} }
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) { void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0";
uint32_t version = 0; uint32_t version = 0;
STsdbFileInfo info = {0}; STsdbFileInfo info = {0};
...@@ -445,7 +445,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) { ...@@ -445,7 +445,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) {
close(fd); close(fd);
*magic = info.magic; *magic = info.magic;
*size = (int32_t)offset; *size = offset;
return; return;
......
...@@ -212,7 +212,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * ...@@ -212,7 +212,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
return 0; return 0;
} }
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size) { uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta; // STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
......
...@@ -58,7 +58,7 @@ int tdKVStoreStartCommit(SKVStore *pStore); ...@@ -58,7 +58,7 @@ int tdKVStoreStartCommit(SKVStore *pStore);
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore *pStore); int tdKVStoreEndCommit(SKVStore *pStore);
void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size); void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -332,7 +332,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) { ...@@ -332,7 +332,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
return 0; return 0;
} }
void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) { void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
SStoreInfo info = {0}; SStoreInfo info = {0};
...@@ -349,7 +349,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) { ...@@ -349,7 +349,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) {
close(fd); close(fd);
*magic = info.magic; *magic = info.magic;
*size = (int32_t)offset; *size = offset;
return; return;
......
...@@ -41,7 +41,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode); ...@@ -41,7 +41,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status); static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds); static void vnodeCtrlFlow(void *handle, int32_t mseconds);
...@@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->sync = syncStart(&syncInfo); pVnode->sync = syncStart(&syncInfo);
if (pVnode->sync == NULL) { if (pVnode->sync == NULL) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
} }
...@@ -536,7 +538,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { ...@@ -536,7 +538,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
return 0; return 0;
} }
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
*fversion = pVnode->fversion; *fversion = pVnode->fversion;
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
......
...@@ -32,10 +32,16 @@ system sh/cfg.sh -n dnode2 -c http -v 1 ...@@ -32,10 +32,16 @@ system sh/cfg.sh -n dnode2 -c http -v 1
system sh/cfg.sh -n dnode3 -c http -v 1 system sh/cfg.sh -n dnode3 -c http -v 1
system sh/cfg.sh -n dnode4 -c http -v 1 system sh/cfg.sh -n dnode4 -c http -v 1
# for crash_gen
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/cfg.sh -n dnode1 -c rpcMaxTime -v 101
system sh/cfg.sh -n dnode1 -c cache -v 2
system sh/cfg.sh -n dnode1 -c keep -v 36500
system sh/cfg.sh -n dnode1 -c walLevel -v 2 system sh/cfg.sh -n dnode1 -c walLevel -v 2
# for windows
system sh/cfg.sh -n dnode1 -c firstEp -v 152.136.17.116:6030 system sh/cfg.sh -n dnode1 -c firstEp -v 152.136.17.116:6030
system sh/cfg.sh -n dnode1 -c secondEp -v 152.136.17.116:6030 system sh/cfg.sh -n dnode1 -c secondEp -v 152.136.17.116:6030
system sh/cfg.sh -n dnode1 -c serverPort -v 6030 system sh/cfg.sh -n dnode1 -c serverPort -v 6030
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册