提交 2caeccf6 编写于 作者: S Shengliang Guan

TD-1617

上级 6d040625
...@@ -171,7 +171,6 @@ void syncBroadcastStatus(SSyncNode *pNode); ...@@ -171,7 +171,6 @@ void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer); void syncAddPeerRef(SSyncPeer *pPeer);
int syncDecPeerRef(SSyncPeer *pPeer); int syncDecPeerRef(SSyncPeer *pPeer);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -38,7 +38,6 @@ void taosCloseTcpThreadPool(ttpool_h); ...@@ -38,7 +38,6 @@ void taosCloseTcpThreadPool(ttpool_h);
void *taosAllocateTcpConn(void *, void *ahandle, int connFd); void *taosAllocateTcpConn(void *, void *ahandle, int connFd);
void taosFreeTcpConn(void *); void taosFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -134,7 +134,7 @@ void syncCleanUp() { ...@@ -134,7 +134,7 @@ void syncCleanUp() {
void *syncStart(const SSyncInfo *pInfo) { void *syncStart(const SSyncInfo *pInfo) {
const SSyncCfg *pCfg = &pInfo->syncCfg; const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1);
if (pNode == NULL) { if (pNode == NULL) {
sError("no memory to allocate syncNode"); sError("no memory to allocate syncNode");
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -168,7 +168,7 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -168,7 +168,7 @@ void *syncStart(const SSyncInfo *pInfo) {
} }
syncAddNodeRef(pNode); syncAddNodeRef(pNode);
if (pNode->selfIndex < 0) { if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId); sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG; terrno = TSDB_CODE_SYN_INVALID_CONFIG;
...@@ -176,11 +176,12 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -176,11 +176,12 @@ void *syncStart(const SSyncInfo *pInfo) {
return NULL; return NULL;
} }
nodeVersion = pInfo->version; // set the initial version nodeVersion = pInfo->version; // set the initial version
nodeRole = (pNode->replica > 1) ? TAOS_SYNC_ROLE_UNSYNCED : TAOS_SYNC_ROLE_MASTER; nodeRole = (pNode->replica > 1) ? TAOS_SYNC_ROLE_UNSYNCED : TAOS_SYNC_ROLE_MASTER;
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
syncRole[nodeRole]);
pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo*sizeof(SFwdInfo), 1); pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1);
if (pNode->pSyncFwds == NULL) { if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -443,9 +444,7 @@ static void syncAddArbitrator(SSyncNode *pNode) { ...@@ -443,9 +444,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
} }
static void syncAddNodeRef(SSyncNode *pNode) { static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); }
atomic_add_fetch_8(&pNode->refCount, 1);
}
static void syncDecNodeRef(SSyncNode *pNode) { static void syncDecNodeRef(SSyncNode *pNode) {
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
...@@ -456,9 +455,7 @@ static void syncDecNodeRef(SSyncNode *pNode) { ...@@ -456,9 +455,7 @@ static void syncDecNodeRef(SSyncNode *pNode) {
} }
} }
void syncAddPeerRef(SSyncPeer *pPeer) { void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); }
atomic_add_fetch_8(&pPeer->refCount, 1);
}
int syncDecPeerRef(SSyncPeer *pPeer) { int syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
...@@ -501,6 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -501,6 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn)); tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn));
pPeer->ip = ip; pPeer->ip = ip;
pPeer->port = pInfo->nodePort; pPeer->port = pInfo->nodePort;
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port); snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port);
pPeer->peerFd = -1; pPeer->peerFd = -1;
...@@ -573,10 +571,10 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -573,10 +571,10 @@ static void syncChooseMaster(SSyncNode *pNode) {
replica = pNode->replica + 1; replica = pNode->replica + 1;
} }
if (index < 0 && onlineNum > replica/2.0) { if (index < 0 && onlineNum > replica / 2.0) {
// over half of nodes are online // over half of nodes are online
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
//slave with highest version shall be master // slave with highest version shall be master
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) { if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) {
...@@ -622,7 +620,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -622,7 +620,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if (onlineNum <= replica * 0.5) { if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
//pNode->peerInfo[pNode->selfIndex]->role = nodeRole; // pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
} }
...@@ -648,7 +646,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -648,7 +646,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
static int syncValidateMaster(SSyncPeer *pPeer) { static int syncValidateMaster(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = 0; int code = 0;
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id);
...@@ -671,7 +669,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne ...@@ -671,7 +669,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
int8_t selfOldRole = nodeRole; int8_t selfOldRole = nodeRole;
int8_t i, syncRequired = 0; int8_t i, syncRequired = 0;
//pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; // pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
pPeer->role = newRole; pPeer->role = newRole;
sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]); sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
...@@ -877,8 +875,6 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -877,8 +875,6 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version); sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version);
} }
} }
return;
} }
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
...@@ -1066,7 +1062,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { ...@@ -1066,7 +1062,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
return; return;
} }
int32_t vgId = firstPkt.syncHead.vgId; int32_t vgId = firstPkt.syncHead.vgId;
SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t)); SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) { if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId); sError("vgId:%d, vgId could not be found", vgId);
......
...@@ -23,10 +23,10 @@ ...@@ -23,10 +23,10 @@
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eindex) { static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex) {
char name[TSDB_FILENAME_LEN*2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
char fname[TSDB_FILENAME_LEN*3] = {0}; char fname[TSDB_FILENAME_LEN * 3] = {0};
uint32_t magic; uint32_t magic;
uint64_t fversion; uint64_t fversion;
int64_t size; int64_t size;
uint32_t index = sindex; uint32_t index = sindex;
...@@ -40,12 +40,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind ...@@ -40,12 +40,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
if (magic == 0) break; if (magic == 0) break;
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name); snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
remove(fname); (void)remove(fname);
sDebug("%s, %s is removed", pPeer->id, fname); sDebug("%s, %s is removed", pPeer->id, fname);
index++; index++;
if (index > eindex) break; if (index > eindex) break;
} }
} }
static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
...@@ -62,35 +62,36 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -62,35 +62,36 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
while (1) { while (1) {
// read file info // read file info
int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
if (ret < 0 ) break; if (ret < 0) break;
// if no more file from master, break; // if no more file from master, break;
if (minfo.name[0] == 0 || minfo.magic == 0) { if (minfo.name[0] == 0 || minfo.magic == 0) {
sDebug("%s, no more files to restore", pPeer->id); sDebug("%s, no more files to restore", pPeer->id);
// remove extra files after the current index // remove extra files after the current index
syncRemoveExtraFile(pPeer, sinfo.index+1, TAOS_SYNC_MAX_INDEX); syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX);
code = 0; code = 0;
break; break;
} }
// remove extra files on slave between the current and last index // remove extra files on slave between the current and last index
syncRemoveExtraFile(pPeer, pindex+1, minfo.index-1); syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1);
pindex = minfo.index; pindex = minfo.index;
// check the file info // check the file info
sinfo = minfo; sinfo = minfo;
sDebug("%s, get file info:%s", pPeer->id, minfo.name); sDebug("%s, get file info:%s", pPeer->id, minfo.name);
sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion); sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
&sinfo.fversion);
// if file not there or magic is not the same, file shall be synced // if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(fileAck)); memset(&fileAck, 0, sizeof(fileAck));
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1:0; fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
// send file ack // send file ack
ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
if (ret <0) break; if (ret < 0) break;
// if sync is not required, continue // if sync is not required, continue
if (fileAck.sync == 0) { if (fileAck.sync == 0) {
sDebug("%s, %s is the same", pPeer->id, minfo.name); sDebug("%s, %s is the same", pPeer->id, minfo.name);
...@@ -99,10 +100,11 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -99,10 +100,11 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// if sync is required, open file, receive from master, and write to file // if sync is required, open file, receive from master, and write to file
// get the full path to file // get the full path to file
minfo.name[sizeof(minfo.name) - 1] = 0;
snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name); snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name);
int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if ( dfd < 0 ) { if (dfd < 0) {
sError("%s, failed to open file:%s", pPeer->id, name); sError("%s, failed to open file:%s", pPeer->id, name);
break; break;
} }
...@@ -110,16 +112,15 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -110,16 +112,15 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size); ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size);
fsync(dfd); fsync(dfd);
close(dfd); close(dfd);
if (ret<0) break; if (ret < 0) break;
sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
} }
if (code == 0 && (minfo.fversion != sinfo.fversion)) { if (code == 0 && (minfo.fversion != sinfo.fversion)) {
// data file is changed, code shall be set to 1 // data file is changed, code shall be set to 1
*fversion = minfo.fversion; *fversion = minfo.fversion;
code = 1; code = 1;
} }
if (code < 0) { if (code < 0) {
...@@ -130,8 +131,8 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -130,8 +131,8 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
} }
static int syncRestoreWal(SSyncPeer *pPeer) { static int syncRestoreWal(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int ret, code = -1; int ret, code = -1;
void *buffer = calloc(1024000, 1); // size for one record void *buffer = calloc(1024000, 1); // size for one record
if (buffer == NULL) return -1; if (buffer == NULL) return -1;
...@@ -140,18 +141,21 @@ static int syncRestoreWal(SSyncPeer *pPeer) { ...@@ -140,18 +141,21 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
while (1) { while (1) {
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
if (ret <0) break; if (ret < 0) break;
if (pHead->len == 0) {
code = 0;
break;
} // wal sync over
if (pHead->len == 0) {code = 0; break;} // wal sync over
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
if (ret <0) break; if (ret < 0) break;
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL);
} }
if (code<0) { if (code < 0) {
sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno)); sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno));
} }
...@@ -159,10 +163,9 @@ static int syncRestoreWal(SSyncPeer *pPeer) { ...@@ -159,10 +163,9 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
return code; return code;
} }
static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead *pHead = (SWalHead *) offset; SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
offset += pHead->len + sizeof(SWalHead); offset += pHead->len + sizeof(SWalHead);
...@@ -171,7 +174,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) ...@@ -171,7 +174,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
} }
static int syncProcessBufferedFwd(SSyncPeer *pPeer) { static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
int forwards = 0; int forwards = 0;
...@@ -182,7 +185,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) { ...@@ -182,7 +185,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
offset = syncProcessOneBufferedFwd(pPeer, offset); offset = syncProcessOneBufferedFwd(pPeer, offset);
forwards++; forwards++;
} }
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
while (forwards < pRecv->forwards && pRecv->code == 0) { while (forwards < pRecv->forwards && pRecv->code == 0) {
...@@ -199,7 +202,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) { ...@@ -199,7 +202,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
} }
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
if (pRecv == NULL) return -1; if (pRecv == NULL) return -1;
...@@ -259,9 +262,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -259,9 +262,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) {
return -1; return -1;
} }
// if code > 0, data file is changed, notify app, and pass the version // if code > 0, data file is changed, notify app, and pass the version
if (code > 0 && pNode->notifyFileSynced) { if (code > 0 && pNode->notifyFileSynced) {
if ( (*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0 ) { if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) {
sError("%s, app not in ready state", pPeer->id); sError("%s, app not in ready state", pPeer->id);
return -1; return -1;
} }
...@@ -296,8 +299,8 @@ void *syncRestoreData(void *param) { ...@@ -296,8 +299,8 @@ void *syncRestoreData(void *param) {
if (syncOpenRecvBuffer(pNode) < 0) { if (syncOpenRecvBuffer(pNode) < 0) {
sError("%s, failed to allocate recv buffer", pPeer->id); sError("%s, failed to allocate recv buffer", pPeer->id);
} else { } else {
if ( syncRestoreDataStepByStep(pPeer) == 0) { if (syncRestoreDataStepByStep(pPeer) == 0) {
sInfo("%s, it is synced successfully", pPeer->id); sInfo("%s, it is synced successfully", pPeer->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
...@@ -311,7 +314,7 @@ void *syncRestoreData(void *param) { ...@@ -311,7 +314,7 @@ void *syncRestoreData(void *param) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
nodeSStatus = TAOS_SYNC_STATUS_INIT; nodeSStatus = TAOS_SYNC_STATUS_INIT;
taosClose(pPeer->syncFd) taosClose(pPeer->syncFd);
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1); __sync_fetch_and_sub(&tsSyncNum, 1);
syncDecPeerRef(pPeer); syncDecPeerRef(pPeer);
......
...@@ -38,13 +38,13 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { ...@@ -38,13 +38,13 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
return -1; return -1;
} }
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles); if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles);
if (pPeer->watchFd == NULL) { if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id); sError("%s, failed to allocate watchFd", pPeer->id);
return -1; return -1;
} }
memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles); memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles);
} }
int *wd = pPeer->watchFd + pPeer->watchNum; int *wd = pPeer->watchFd + pPeer->watchNum;
...@@ -64,7 +64,7 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { ...@@ -64,7 +64,7 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum); sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum);
} }
pPeer->watchNum = (pPeer->watchNum +1) % tsMaxWatchFiles; pPeer->watchNum = (pPeer->watchNum + 1) % tsMaxWatchFiles;
return 0; return 0;
} }
...@@ -72,20 +72,20 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { ...@@ -72,20 +72,20 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
static int syncAreFilesModified(SSyncPeer *pPeer) { static int syncAreFilesModified(SSyncPeer *pPeer) {
if (pPeer->notifyFd <= 0) return 0; if (pPeer->notifyFd <= 0) return 0;
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1; return -1;
} }
int code = 0; int code = 0;
if (len > 0) { if (len > 0) {
const struct inotify_event *event; const struct inotify_event *event;
char *ptr; char *ptr;
for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
event = (const struct inotify_event *) ptr; event = (const struct inotify_event *)ptr;
if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) { if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) {
sDebug("%s, processed file is changed", pPeer->id); sDebug("%s, processed file is changed", pPeer->id);
pPeer->fileChanged = 1; pPeer->fileChanged = 1;
code = 1; code = 1;
...@@ -98,11 +98,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) { ...@@ -98,11 +98,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) {
} }
static int syncRetrieveFile(SSyncPeer *pPeer) { static int syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; SFileInfo fileInfo;
SFileAck fileAck; SFileAck fileAck;
int code = -1; int code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
memset(&fileInfo, 0, sizeof(fileInfo)); memset(&fileInfo, 0, sizeof(fileInfo));
memset(&fileAck, 0, sizeof(fileAck)); memset(&fileAck, 0, sizeof(fileAck));
...@@ -110,17 +110,19 @@ static int syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -110,17 +110,19 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
while (1) { while (1) {
// retrieve file info // retrieve file info
fileInfo.name[0] = 0; fileInfo.name[0] = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
//fileInfo.size = htonl(size); &fileInfo.size, &fileInfo.fversion);
// fileInfo.size = htonl(size);
// send the file info // send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
if (ret < 0 ) break; if (ret < 0) break;
// if no file anymore, break // if no file anymore, break
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
sDebug("%s, no more files to sync", pPeer->id); sDebug("%s, no more files to sync", pPeer->id);
code = 0; break; code = 0;
break;
} }
// wait for the ack from peer // wait for the ack from peer
...@@ -132,29 +134,29 @@ static int syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -132,29 +134,29 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
// get the full path to file // get the full path to file
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
// add the file into watch list // add the file into watch list
if ( syncAddIntoWatchList(pPeer, name) <0) break; if (syncAddIntoWatchList(pPeer, name) < 0) break;
// if sync is not required, continue // if sync is not required, continue
if (fileAck.sync == 0) { if (fileAck.sync == 0) {
fileInfo.index++; fileInfo.index++;
sDebug("%s, %s is the same", pPeer->id, fileInfo.name); sDebug("%s, %s is the same", pPeer->id, fileInfo.name);
continue; continue;
} }
// send the file to peer // send the file to peer
int sfd = open(name, O_RDONLY); int sfd = open(name, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) break;
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret < 0) break; if (ret < 0) break;
sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
fileInfo.index++; fileInfo.index++;
// check if processed files are modified // check if processed files are modified
if (syncAreFilesModified(pPeer) != 0) break; if (syncAreFilesModified(pPeer) != 0) break;
} }
...@@ -201,15 +203,15 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { ...@@ -201,15 +203,15 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
return -1; return -1;
} }
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles); if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles);
if (pPeer->watchFd == NULL) { if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id); sError("%s, failed to allocate watchFd", pPeer->id);
return -1; return -1;
} }
memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles); memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles);
int *wd = pPeer->watchFd; int *wd = pPeer->watchFd;
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
if (*wd == -1) { if (*wd == -1) {
sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno)); sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno));
...@@ -219,8 +221,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { ...@@ -219,8 +221,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
return 0; return 0;
} }
static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
...@@ -231,26 +233,29 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { ...@@ -231,26 +233,29 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
struct inotify_event *event; struct inotify_event *event;
for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
event = (struct inotify_event *) ptr; event = (struct inotify_event *)ptr;
if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY; if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY;
if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE; if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE;
} }
if (pEvent != 0) if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
return 0; return 0;
} }
static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
SWalHead *pHead = (SWalHead *) malloc(640000); SWalHead *pHead = malloc(640000);
int code = -1; int code = -1;
int32_t bytes = 0; int32_t bytes = 0;
int sfd; int sfd;
sfd = open(name, O_RDONLY); sfd = open(name, O_RDONLY);
if (sfd < 0) return -1; if (sfd < 0) {
lseek(sfd, offset, SEEK_SET); free(pHead);
return -1;
}
(void)lseek(sfd, offset, SEEK_SET);
sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion);
while (1) { while (1) {
...@@ -263,34 +268,34 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, ...@@ -263,34 +268,34 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if ( ret != wsize ) break; if (ret != wsize) break;
pPeer->sversion = pHead->version; pPeer->sversion = pHead->version;
bytes += wsize; bytes += wsize;
if (pHead->version >= fversion && fversion > 0) { if (pHead->version >= fversion && fversion > 0) {
code = 0; code = 0;
bytes = 0; bytes = 0;
break; break;
} }
} }
free(pHead); free(pHead);
taosClose(sfd); close(sfd);
if (code == 0) return bytes; if (code == 0) return bytes;
return -1; return -1;
} }
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = -1; int code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
if (syncAreFilesModified(pPeer) != 0) return -1; if (syncAreFilesModified(pPeer) != 0) return -1;
while (1) { while (1) {
int32_t once = 0; // last WAL has once ever been processed int32_t once = 0; // last WAL has once ever been processed
int64_t offset = 0; int64_t offset = 0;
uint64_t fversion = 0; uint64_t fversion = 0;
uint32_t event = 0; uint32_t event = 0;
...@@ -300,48 +305,48 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { ...@@ -300,48 +305,48 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname); sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
// monitor last wal // monitor last wal
if (syncMonitorLastWal(pPeer, fname) <0) break; if (syncMonitorLastWal(pPeer, fname) < 0) break;
while (1) { while (1) {
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event); int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event);
if (bytes < 0) break; if (bytes < 0) break;
// check file changes // check file changes
if (syncCheckLastWalChanges(pPeer, &event) <0) break; if (syncCheckLastWalChanges(pPeer, &event) < 0) break;
// if file is not updated or updated once, set the fversion and sstatus // if file is not updated or updated once, set the fversion and sstatus
if (((event & IN_MODIFY) == 0) || once) { if (((event & IN_MODIFY) == 0) || once) {
if (fversion == 0) { if (fversion == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
fversion = nodeVersion; // must read data to fversion fversion = nodeVersion; // must read data to fversion
} }
} }
// if all data up to fversion is read out, it is over // if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) { if (pPeer->sversion >= fversion && fversion > 0) {
code = 0; code = 0;
sDebug("%s, data up to fversion:%ld has been read out, bytes:%d", pPeer->id, fversion, bytes); sDebug("%s, data up to fversion:%ld has been read out, bytes:%d", pPeer->id, fversion, bytes);
break; break;
} }
// if all data are read out, and no update // if all data are read out, and no update
if ((bytes == 0) && ((event & IN_MODIFY) == 0)) { if ((bytes == 0) && ((event & IN_MODIFY) == 0)) {
// wal file is closed, break // wal file is closed, break
if (event & IN_CLOSE_WRITE) { if (event & IN_CLOSE_WRITE) {
code = 0; code = 0;
sDebug("%s, current wal is closed", pPeer->id); sDebug("%s, current wal is closed", pPeer->id);
break; break;
} }
// wal not closed, it means some data not flushed to disk, wait for a while // wal not closed, it means some data not flushed to disk, wait for a while
usleep(10000); usleep(10000);
} }
// if bytes>0, file is updated, or fversion is not reached but file still open, read again // if bytes>0, file is updated, or fversion is not reached but file still open, read again
once = 1; once = 1;
offset += bytes; offset += bytes;
sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes); sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes);
event = event & (~IN_MODIFY); // clear IN_MODIFY flag event = event & (~IN_MODIFY); // clear IN_MODIFY flag
} }
if (code < 0) break; if (code < 0) break;
...@@ -356,7 +361,7 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { ...@@ -356,7 +361,7 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
break; break;
} }
// current last wal is closed, there is a new one // current last wal is closed, there is a new one
sDebug("%s, last wal is closed, try new one", pPeer->id); sDebug("%s, last wal is closed, try new one", pPeer->id);
} }
...@@ -377,14 +382,14 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -377,14 +382,14 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
while (1) { while (1) {
// retrieve wal info // retrieve wal info
wname[0] = 0; wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
if (code < 0) break; // error if (code < 0) break; // error
if (wname[0] == 0) { // no wal file if (wname[0] == 0) { // no wal file
sDebug("%s, no wal file", pPeer->id); sDebug("%s, no wal file", pPeer->id);
break; break;
} }
if (code == 0) { // last wal if (code == 0) { // last wal
code = syncProcessLastWal(pPeer, wname, index); code = syncProcessLastWal(pPeer, wname, index);
break; break;
} }
...@@ -392,26 +397,26 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -392,26 +397,26 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
// get the full path to wal file // get the full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
// send wal file, // send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok // inotify is not required, old wal file won't be modified, even remove is ok
if (stat(fname, &fstat) < 0) break; if (stat(fname, &fstat) < 0) break;
size = fstat.st_size; size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
int sfd = open(fname, O_RDONLY); int sfd = open(fname, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) break;
code = taosTSendFile(pPeer->syncFd, sfd, NULL, size); code = taosTSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd); close(sfd);
if (code <0) break; if (code < 0) break;
index++; index++;
if (syncAreFilesModified(pPeer) != 0) break; if (syncAreFilesModified(pPeer) != 0) break;
} }
if (code == 0) { if (code == 0) {
sDebug("%s, wal retrieve is finished", pPeer->id); sDebug("%s, wal retrieve is finished", pPeer->id);
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
SWalHead walHead; SWalHead walHead;
memset(&walHead, 0, sizeof(walHead)); memset(&walHead, 0, sizeof(walHead));
...@@ -433,12 +438,12 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -433,12 +438,12 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
if (write(pPeer->syncFd, (char *) &firstPkt, sizeof(firstPkt)) < 0) { if (write(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) {
sError("%s, failed to send syncCmd", pPeer->id); sError("%s, failed to send syncCmd", pPeer->id);
return -1; return -1;
} }
pPeer->sversion = 0; pPeer->sversion = 0;
pPeer->sstatus = TAOS_SYNC_STATUS_FILE; pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
sDebug("%s, start to retrieve file", pPeer->id); sDebug("%s, start to retrieve file", pPeer->id);
if (syncRetrieveFile(pPeer) < 0) { if (syncRetrieveFile(pPeer) < 0) {
...@@ -447,8 +452,7 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -447,8 +452,7 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
} }
// if no files are synced, there must be wal to sync, sversion must be larger than one // if no files are synced, there must be wal to sync, sversion must be larger than one
if (pPeer->sversion == 0) if (pPeer->sversion == 0) pPeer->sversion = 1;
pPeer->sversion = 1;
sDebug("%s, start to retrieve wal", pPeer->id); sDebug("%s, start to retrieve wal", pPeer->id);
if (syncRetrieveWal(pPeer) < 0) { if (syncRetrieveWal(pPeer) < 0) {
...@@ -460,8 +464,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -460,8 +464,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
} }
void *syncRetrieveData(void *param) { void *syncRetrieveData(void *param) {
SSyncPeer * pPeer = (SSyncPeer *)param; SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
pPeer->fileChanged = 0; pPeer->fileChanged = 0;
...@@ -470,7 +474,7 @@ void *syncRetrieveData(void *param) { ...@@ -470,7 +474,7 @@ void *syncRetrieveData(void *param) {
sError("%s, failed to open socket to sync", pPeer->id); sError("%s, failed to open socket to sync", pPeer->id);
} else { } else {
sInfo("%s, sync tcp is setup", pPeer->id); sInfo("%s, sync tcp is setup", pPeer->id);
if (syncRetrieveDataStepByStep(pPeer) == 0) { if (syncRetrieveDataStepByStep(pPeer) == 0) {
sDebug("%s, sync retrieve process is successful", pPeer->id); sDebug("%s, sync retrieve process is successful", pPeer->id);
} else { } else {
...@@ -482,12 +486,11 @@ void *syncRetrieveData(void *param) { ...@@ -482,12 +486,11 @@ void *syncRetrieveData(void *param) {
if (pPeer->fileChanged) { if (pPeer->fileChanged) {
// if file is changed 3 times continuously, start flow control // if file is changed 3 times continuously, start flow control
pPeer->numOfRetrieves++; pPeer->numOfRetrieves++;
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
(*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2)); (*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2));
} else { } else {
pPeer->numOfRetrieves = 0; pPeer->numOfRetrieves = 0;
if (pNode->notifyFlowCtrl) if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
} }
pPeer->fileChanged = 0; pPeer->fileChanged = 0;
......
...@@ -45,8 +45,8 @@ typedef struct { ...@@ -45,8 +45,8 @@ typedef struct {
static void *taosAcceptPeerTcpConnection(void *argv); static void *taosAcceptPeerTcpConnection(void *argv);
static void *taosProcessTcpData(void *param); static void *taosProcessTcpData(void *param);
static void taosStopPoolThread(SThreadObj *pThread);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static SThreadObj *taosGetTcpThread(SPoolObj *pPool);
static void taosStopPoolThread(SThreadObj* pThread);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_t thattr; pthread_attr_t thattr;
...@@ -58,8 +58,8 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -58,8 +58,8 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
} }
pPool->info = *pInfo; pPool->info = *pInfo;
pPool->pThread = (SThreadObj **) calloc(sizeof(SThreadObj *), pInfo->numOfThreads); pPool->pThread = (SThreadObj **)calloc(sizeof(SThreadObj *), pInfo->numOfThreads);
if (pPool->pThread == NULL) { if (pPool->pThread == NULL) {
uError("TCP server, no enough memory"); uError("TCP server, no enough memory");
free(pPool); free(pPool);
...@@ -68,17 +68,19 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -68,17 +68,19 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port); pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port);
if (pPool->acceptFd < 0) { if (pPool->acceptFd < 0) {
free(pPool->pThread); free(pPool); free(pPool->pThread);
free(pPool);
uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno)); uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno));
return NULL; return NULL;
} }
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pPool->thread), &thattr, (void *) taosAcceptPeerTcpConnection, pPool) != 0) { if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) {
uError("TCP server, failed to create accept thread, reason:%s", strerror(errno)); uError("TCP server, failed to create accept thread, reason:%s", strerror(errno));
close(pPool->acceptFd); close(pPool->acceptFd);
free(pPool->pThread); free(pPool); free(pPool->pThread);
free(pPool);
return NULL; return NULL;
} }
...@@ -89,29 +91,30 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -89,29 +91,30 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
} }
void taosCloseTcpThreadPool(void *param) { void taosCloseTcpThreadPool(void *param) {
SPoolObj *pPool = (SPoolObj *)param; SPoolObj * pPool = (SPoolObj *)param;
SThreadObj *pThread; SThreadObj *pThread;
shutdown(pPool->acceptFd, SHUT_RD); shutdown(pPool->acceptFd, SHUT_RD);
pthread_join(pPool->thread, NULL); pthread_join(pPool->thread, NULL);
for (int i = 0; i < pPool->info.numOfThreads; ++i) { for (int i = 0; i < pPool->info.numOfThreads; ++i) {
pThread = pPool->pThread[i]; pThread = pPool->pThread[i];
if (pThread) taosStopPoolThread(pThread); if (pThread) taosStopPoolThread(pThread);
} }
uDebug("%p TCP pool is closed", pPool);
taosTFree(pPool->pThread); taosTFree(pPool->pThread);
free(pPool); free(pPool);
uDebug("%p TCP pool is closed", pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = (SPoolObj *)param;
SConnObj *pConn = (SConnObj *) calloc(sizeof(SConnObj), 1); SConnObj *pConn = (SConnObj *)calloc(sizeof(SConnObj), 1);
if (pConn == NULL) { if (pConn == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
...@@ -131,7 +134,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { ...@@ -131,7 +134,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
uError("failed to add fd:%d(%s)", connFd, strerror(errno)); uError("failed to add fd:%d(%s)", connFd, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
free(pConn); free(pConn);
pConn = NULL; pConn = NULL;
} else { } else {
...@@ -143,8 +146,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { ...@@ -143,8 +146,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
} }
void taosFreeTcpConn(void *param) { void taosFreeTcpConn(void *param) {
SConnObj * pConn = (SConnObj *)param; SConnObj * pConn = (SConnObj *)param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
pConn->closedByApp = 1; pConn->closedByApp = 1;
...@@ -153,9 +156,9 @@ void taosFreeTcpConn(void *param) { ...@@ -153,9 +156,9 @@ void taosFreeTcpConn(void *param) {
static void taosProcessBrokenLink(SConnObj *pConn) { static void taosProcessBrokenLink(SConnObj *pConn) {
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
SPoolObj *pPool = pThread->pPool; SPoolObj * pPool = pThread->pPool;
SPoolInfo *pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR); if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
(*pInfo->processBrokenLink)(pConn->ahandle); (*pInfo->processBrokenLink)(pConn->ahandle);
...@@ -169,24 +172,24 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -169,24 +172,24 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
#define maxEvents 10 #define maxEvents 10
static void *taosProcessTcpData(void *param) { static void *taosProcessTcpData(void *param) {
SThreadObj *pThread = (SThreadObj *) param; SThreadObj *pThread = (SThreadObj *)param;
SPoolObj *pPool = pThread->pPool; SPoolObj * pPool = pThread->pPool;
SPoolInfo *pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
SConnObj *pConn = NULL; SConnObj * pConn = NULL;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
void *buffer = malloc(pInfo->bufferSize); void *buffer = malloc(pInfo->bufferSize);
taosBlockSIGPIPE(); taosBlockSIGPIPE();
while (1) { while (1) {
if (pThread->stop) break; if (pThread->stop) break;
int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
if (pThread->stop) { if (pThread->stop) {
uDebug("%p TCP epoll thread is exiting...", pThread); uDebug("%p TCP epoll thread is exiting...", pThread);
break; break;
} }
if (fdNum < 0) { if (fdNum < 0) {
uError("epoll_wait failed (%s)", strerror(errno)); uError("epoll_wait failed (%s)", strerror(errno));
continue; continue;
} }
...@@ -215,27 +218,28 @@ static void *taosProcessTcpData(void *param) { ...@@ -215,27 +218,28 @@ static void *taosProcessTcpData(void *param) {
taosFreeTcpConn(pConn); taosFreeTcpConn(pConn);
continue; continue;
} }
} }
} }
} }
uDebug("%p TCP epoll thread exits", pThread);
close(pThread->pollFd); close(pThread->pollFd);
free(pThread); free(pThread);
free(buffer); free(buffer);
uDebug("%p TCP epoll thread exits", pThread); return NULL;
return NULL;
} }
static void *taosAcceptPeerTcpConnection(void *argv) { static void *taosAcceptPeerTcpConnection(void *argv) {
SPoolObj *pPool = (SPoolObj *)argv; SPoolObj * pPool = (SPoolObj *)argv;
SPoolInfo *pInfo = &pPool->info; SPoolInfo *pInfo = &pPool->info;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
while (1) { while (1) {
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
socklen_t addrlen = sizeof(clientAddr); socklen_t addrlen = sizeof(clientAddr);
int connFd = accept(pPool->acceptFd, (struct sockaddr *) &clientAddr, &addrlen); int connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd < 0) { if (connFd < 0) {
if (errno == EINVAL) { if (errno == EINVAL) {
uDebug("%p TCP server accept is exiting...", pPool); uDebug("%p TCP server accept is exiting...", pPool);
...@@ -246,7 +250,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) { ...@@ -246,7 +250,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) {
} }
} }
//uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port); // uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port);
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
(*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr); (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr);
} }
...@@ -260,7 +264,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -260,7 +264,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
if (pThread) return pThread; if (pThread) return pThread;
pThread = (SThreadObj *) calloc(1, sizeof(SThreadObj)); pThread = (SThreadObj *)calloc(1, sizeof(SThreadObj));
if (pThread == NULL) return NULL; if (pThread == NULL) return NULL;
pThread->pPool = pPool; pThread->pPool = pPool;
...@@ -273,7 +277,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -273,7 +277,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int ret = pthread_create(&(pThread->thread), &thattr, (void *) taosProcessTcpData, pThread); int ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
...@@ -290,20 +294,20 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -290,20 +294,20 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
return pThread; return pThread;
} }
static void taosStopPoolThread(SThreadObj* pThread) { static void taosStopPoolThread(SThreadObj *pThread) {
pThread->stop = true; pThread->stop = true;
if (pThread->thread == pthread_self()) { if (pThread->thread == pthread_self()) {
pthread_detach(pthread_self()); pthread_detach(pthread_self());
return; return;
} }
// save thread ID into a local variable, since pThread is freed when the thread exits // save thread ID into a local variable, since pThread is freed when the thread exits
pthread_t thread = pThread->thread; pthread_t thread = pThread->thread;
// signal the thread to stop, try graceful method first, // signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed // and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN }; struct epoll_event event = {.events = EPOLLIN};
eventfd_t fd = eventfd(1, 0); eventfd_t fd = eventfd(1, 0);
if (fd == -1) { if (fd == -1) {
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption // failed to create eventfd, call pthread_cancel instead, which may result in data corruption
...@@ -319,4 +323,3 @@ static void taosStopPoolThread(SThreadObj* pThread) { ...@@ -319,4 +323,3 @@ static void taosStopPoolThread(SThreadObj* pThread) {
pthread_join(thread, NULL); pthread_join(thread, NULL);
taosClose(fd); taosClose(fd);
} }
...@@ -25,31 +25,32 @@ typedef struct { ...@@ -25,31 +25,32 @@ typedef struct {
int num; int num;
int numOfReqs; int numOfReqs;
int msgSize; int msgSize;
tsem_t rspSem; tsem_t rspSem;
tsem_t *pOverSem; tsem_t * pOverSem;
pthread_t thread; pthread_t thread;
void *pRpc; void * pRpc;
} SInfo; } SInfo;
void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle; SInfo *pInfo = (SInfo *)pMsg->ahandle;
uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet; if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
tsem_post(&pInfo->rspSem); tsem_post(&pInfo->rspSem);
} }
int tcount = 0; int tcount = 0;
void *sendRequest(void *param) { void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param; SInfo * pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
uDebug("thread:%d, start to send request", pInfo->index); uDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++; pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize; rpcMsg.contLen = pInfo->msgSize;
...@@ -57,8 +58,9 @@ void *sendRequest(void *param) { ...@@ -57,8 +58,9 @@ void *sendRequest(void *param) {
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
if ( pInfo->num % 20000 == 0 ) if (pInfo->num % 20000 == 0) {
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
}
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
} }
...@@ -72,12 +74,12 @@ int main(int argc, char *argv[]) { ...@@ -72,12 +74,12 @@ int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
SRpcEpSet epSet; SRpcEpSet epSet;
char secret[TSDB_KEY_LEN] = "mypassword"; char secret[TSDB_KEY_LEN] = "mypassword";
int msgSize = 128; int msgSize = 128;
int numOfReqs = 0; int numOfReqs = 0;
int appThreads = 1; int appThreads = 1;
char serverIp[40] = "127.0.0.1"; char serverIp[40] = "127.0.0.1";
struct timeval systemTime; struct timeval systemTime;
int64_t startTime, endTime; int64_t startTime, endTime;
pthread_attr_t thattr; pthread_attr_t thattr;
// server info // server info
...@@ -102,30 +104,30 @@ int main(int argc, char *argv[]) { ...@@ -102,30 +104,30 @@ int main(int argc, char *argv[]) {
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
epSet.port[0] = atoi(argv[++i]); epSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) { } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
strcpy(epSet.fqdn[0], argv[++i]); tstrncpy(epSet.fqdn[0], argv[++i], TSDB_FQDN_LEN);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
msgSize = atoi(argv[++i]); msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) { } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
rpcInit.sessions = atoi(argv[++i]); rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) { } else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
numOfReqs = atoi(argv[++i]); numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) { } else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
appThreads = atoi(argv[++i]); appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) { } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
tsCompressMsgSize = atoi(argv[++i]); tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) { } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
rpcInit.user = argv[++i]; rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) { } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
rpcInit.secret = argv[++i]; rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) { } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
rpcInit.spi = atoi(argv[++i]); rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) { } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]); rpcDebugFlag = atoi(argv[++i]);
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
...@@ -157,14 +159,14 @@ int main(int argc, char *argv[]) { ...@@ -157,14 +159,14 @@ int main(int argc, char *argv[]) {
uInfo("client is initialized"); uInfo("client is initialized");
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) { for (int i = 0; i < appThreads; ++i) {
pInfo->index = i; pInfo->index = i;
pInfo->epSet = epSet; pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs; pInfo->numOfReqs = numOfReqs;
...@@ -177,18 +179,16 @@ int main(int argc, char *argv[]) { ...@@ -177,18 +179,16 @@ int main(int argc, char *argv[]) {
do { do {
usleep(1); usleep(1);
} while ( tcount < appThreads); } while (tcount < appThreads);
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds float usedTime = (endTime - startTime) / 1000.0; // mseconds
uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
taosCloseLog(); taosCloseLog();
return 0; return 0;
} }
...@@ -24,28 +24,27 @@ ...@@ -24,28 +24,27 @@
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
int msgSize = 128; int msgSize = 128;
int commit = 0; int commit = 0;
int dataFd = -1; int dataFd = -1;
void *qhandle = NULL; void * qhandle = NULL;
int walNum = 0; int walNum = 0;
uint64_t tversion = 0; uint64_t tversion = 0;
void *syncHandle; void * syncHandle;
int role; int role;
int nodeId; int nodeId;
char path[256]; char path[256];
int numOfWrites ; int numOfWrites;
SSyncInfo syncInfo; SSyncInfo syncInfo;
SSyncCfg *pCfg; SSyncCfg *pCfg;
int writeIntoWal(SWalHead *pHead) int writeIntoWal(SWalHead *pHead) {
{
if (dataFd < 0) { if (dataFd < 0) {
char walName[280]; char walName[280];
snprintf(walName, sizeof(walName), "%s/wal/wal.%d", path, walNum); snprintf(walName, sizeof(walName), "%s/wal/wal.%d", path, walNum);
remove(walName); (void)remove(walName);
dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd < 0) { if (dataFd < 0) {
uInfo("failed to open wal file:%s(%s)", walName, strerror(errno)); uInfo("failed to open wal file:%s(%s)", walName, strerror(errno));
return -1; return -1;
} else { } else {
...@@ -67,54 +66,52 @@ int writeIntoWal(SWalHead *pHead) ...@@ -67,54 +66,52 @@ int writeIntoWal(SWalHead *pHead)
dataFd = -1; dataFd = -1;
numOfWrites = 0; numOfWrites = 0;
} }
return 0; return 0;
} }
void confirmForward(void *ahandle, void *mhandle, int32_t code) void confirmForward(void *ahandle, void *mhandle, int32_t code) {
{ SRpcMsg * pMsg = (SRpcMsg *)mhandle;
SRpcMsg *pMsg = (SRpcMsg *)mhandle;
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
uDebug("ver:%" PRIu64 ", confirm is received", pHead->version); uDebug("ver:%" PRIu64 ", confirm is received", pHead->version);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = pMsg->handle; rpcMsg.handle = pMsg->handle;
rpcMsg.code = code; rpcMsg.code = code;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
taosFreeQitem(mhandle); taosFreeQitem(mhandle);
} }
int processRpcMsg(void *item) { int processRpcMsg(void *item) {
SRpcMsg *pMsg = (SRpcMsg *)item; SRpcMsg * pMsg = (SRpcMsg *)item;
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
int code = -1; int code = -1;
if (role != TAOS_SYNC_ROLE_MASTER) { if (role != TAOS_SYNC_ROLE_MASTER) {
uError("not master, write failed, role:%s", syncRole[role]); uError("not master, write failed, role:%s", syncRole[role]);
} else { } else {
pHead->version = ++tversion; pHead->version = ++tversion;
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->len = pMsg->contLen; pHead->len = pMsg->contLen;
uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version); uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version);
writeIntoWal(pHead); writeIntoWal(pHead);
syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC); syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC);
code = 0; code = 0;
} }
if (pCfg->quorum <= 1) { if (pCfg->quorum <= 1) {
taosFreeQitem(item);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(item);
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = pMsg->handle; rpcMsg.handle = pMsg->handle;
...@@ -126,7 +123,6 @@ int processRpcMsg(void *item) { ...@@ -126,7 +123,6 @@ int processRpcMsg(void *item) {
} }
int processFwdMsg(void *item) { int processFwdMsg(void *item) {
SWalHead *pHead = (SWalHead *)item; SWalHead *pHead = (SWalHead *)item;
if (pHead->version <= tversion) { if (pHead->version <= tversion) {
...@@ -142,11 +138,11 @@ int processFwdMsg(void *item) { ...@@ -142,11 +138,11 @@ int processFwdMsg(void *item) {
// write into cache // write into cache
/* /*
if (pHead->handle) { if (pHead->handle) {
syncSendFwdAck(syncHandle, pHead->handle, 0); syncSendFwdAck(syncHandle, pHead->handle, 0);
} }
*/ */
taosFreeQitem(item); taosFreeQitem(item);
...@@ -154,7 +150,6 @@ int processFwdMsg(void *item) { ...@@ -154,7 +150,6 @@ int processFwdMsg(void *item) {
} }
int processWalMsg(void *item) { int processWalMsg(void *item) {
SWalHead *pHead = (SWalHead *)item; SWalHead *pHead = (SWalHead *)item;
if (pHead->version <= tversion) { if (pHead->version <= tversion) {
...@@ -168,11 +163,11 @@ int processWalMsg(void *item) { ...@@ -168,11 +163,11 @@ int processWalMsg(void *item) {
// write into cache // write into cache
/* /*
if (pHead->handle) { if (pHead->handle) {
syncSendFwdAck(syncHandle, pHead->handle, 0); syncSendFwdAck(syncHandle, pHead->handle, 0);
} }
*/ */
taosFreeQitem(item); taosFreeQitem(item);
...@@ -180,15 +175,15 @@ int processWalMsg(void *item) { ...@@ -180,15 +175,15 @@ int processWalMsg(void *item) {
} }
void *processWriteQueue(void *param) { void *processWriteQueue(void *param) {
int type; int type;
void *item; void *item;
while (1) { while (1) {
int ret = taosReadQitem(qhandle, &type, &item); int ret = taosReadQitem(qhandle, &type, &item);
if (ret <= 0) { if (ret <= 0) {
usleep(1000); usleep(1000);
continue; continue;
} }
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
processRpcMsg(item); processRpcMsg(item);
...@@ -196,8 +191,7 @@ void *processWriteQueue(void *param) { ...@@ -196,8 +191,7 @@ void *processWriteQueue(void *param) {
processWalMsg(item); processWalMsg(item);
} else if (type == TAOS_QTYPE_FWD) { } else if (type == TAOS_QTYPE_FWD) {
processFwdMsg(item); processFwdMsg(item);
} }
} }
return NULL; return NULL;
...@@ -224,21 +218,19 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char ...@@ -224,21 +218,19 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
} }
void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
uDebug("request is received, type:%d, len:%d", pMsg->msgType, pMsg->contLen); uDebug("request is received, type:%d, len:%d", pMsg->msgType, pMsg->contLen);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
{ uint32_t magic;
uint32_t magic; struct stat fstat;
struct stat fstat; char aname[280];
char aname[280];
if (*index == 2) { if (*index == 2) {
uInfo("wait for a while ....."); uInfo("wait for a while .....");
...@@ -246,15 +238,15 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex ...@@ -246,15 +238,15 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
} }
if (name[0] == 0) { if (name[0] == 0) {
// find the file // find the file
snprintf(aname, sizeof(aname), "%s/data/data.%d", path, *index); snprintf(aname, sizeof(aname), "%s/data/data.%d", path, *index);
sprintf(name, "data/data.%d", *index); sprintf(name, "data/data.%d", *index);
} else { } else {
snprintf(aname, sizeof(aname), "%s/%s", path, name); snprintf(aname, sizeof(aname), "%s/%s", path, name);
} }
uInfo("get file info:%s", aname); uInfo("get file info:%s", aname);
if ( stat(aname, &fstat) < 0 ) return 0; if (stat(aname, &fstat) < 0) return 0;
*size = fstat.st_size; *size = fstat.st_size;
magic = fstat.st_size; magic = fstat.st_size;
...@@ -262,24 +254,22 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex ...@@ -262,24 +254,22 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
return magic; return magic;
} }
int getWalInfo(void *ahandle, char *name, uint32_t *index) { int getWalInfo(void *ahandle, char *name, uint32_t *index) {
struct stat fstat;
struct stat fstat; char aname[280];
char aname[280];
name[0] = 0; name[0] = 0;
if (*index + 1> walNum) return 0; if (*index + 1 > walNum) return 0;
snprintf(aname, sizeof(aname), "%s/wal/wal.%d", path, *index); snprintf(aname, sizeof(aname), "%s/wal/wal.%d", path, *index);
sprintf(name, "wal/wal.%d", *index); sprintf(name, "wal/wal.%d", *index);
uInfo("get wal info:%s", aname); uInfo("get wal info:%s", aname);
if ( stat(aname, &fstat) < 0 ) return -1; if (stat(aname, &fstat) < 0) return -1;
if (*index >= walNum-1) return 0; // no more if (*index >= walNum - 1) return 0; // no more
return 1; return 1;
} }
int writeToCache(void *ahandle, void *data, int type) { int writeToCache(void *ahandle, void *data, int type) {
...@@ -290,24 +280,19 @@ int writeToCache(void *ahandle, void *data, int type) { ...@@ -290,24 +280,19 @@ int writeToCache(void *ahandle, void *data, int type) {
int msgSize = pHead->len + sizeof(SWalHead); int msgSize = pHead->len + sizeof(SWalHead);
void *pMsg = taosAllocateQitem(msgSize); void *pMsg = taosAllocateQitem(msgSize);
memcpy(pMsg, pHead, msgSize); memcpy(pMsg, pHead, msgSize);
taosWriteQitem(qhandle, type, pMsg); taosWriteQitem(qhandle, type, pMsg);
return 0; return 0;
} }
void confirmFwd(void *ahandle, int64_t version) { void confirmFwd(void *ahandle, int64_t version) { return; }
return;
}
void notifyRole(void *ahandle, int8_t r) { void notifyRole(void *ahandle, int8_t r) {
role = r; role = r;
printf("current role:%s\n", syncRole[role]); printf("current role:%s\n", syncRole[role]);
} }
void initSync() { void initSync() {
pCfg->replica = 1; pCfg->replica = 1;
pCfg->quorum = 1; pCfg->quorum = 1;
syncInfo.vgId = 1; syncInfo.vgId = 1;
...@@ -339,20 +324,18 @@ void initSync() { ...@@ -339,20 +324,18 @@ void initSync() {
taosGetFqdn(pCfg->nodeInfo[4].nodeFqdn); taosGetFqdn(pCfg->nodeInfo[4].nodeFqdn);
} }
void doSync() void doSync() {
{ for (int i = 0; i < 5; ++i) {
for (int i=0; i<5; ++i) { if (tsSyncPort == pCfg->nodeInfo[i].nodePort) nodeId = pCfg->nodeInfo[i].nodeId;
if (tsSyncPort == pCfg->nodeInfo[i].nodePort)
nodeId = pCfg->nodeInfo[i].nodeId;
} }
snprintf(path, sizeof(path), "/root/test/d%d", nodeId); snprintf(path, sizeof(path), "/root/test/d%d", nodeId);
strcpy(syncInfo.path, path); tstrncpy(syncInfo.path, path, sizeof(syncInfo.path));
if ( syncHandle == NULL) { if (syncHandle == NULL) {
syncHandle = syncStart(&syncInfo); syncHandle = syncStart(&syncInfo);
} else { } else {
if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL; if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL;
} }
uInfo("nodeId:%d path:%s syncPort:%d", nodeId, path, tsSyncPort); uInfo("nodeId:%d path:%s syncPort:%d", nodeId, path, tsSyncPort);
...@@ -361,39 +344,39 @@ void doSync() ...@@ -361,39 +344,39 @@ void doSync()
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
char dataName[20] = "server.data"; char dataName[20] = "server.data";
pCfg = &syncInfo.syncCfg; pCfg = &syncInfo.syncCfg;
initSync(); initSync();
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 7000; rpcInit.localPort = 7000;
rpcInit.label = "SER"; rpcInit.label = "SER";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg; rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000; rpcInit.sessions = 1000;
rpcInit.idleTime = tsShellActivityTimer*1500; rpcInit.idleTime = tsShellActivityTimer * 1500;
rpcInit.afp = retrieveAuthInfo; rpcInit.afp = retrieveAuthInfo;
for (int i=1; i<argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
rpcInit.localPort = atoi(argv[++i]); rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
msgSize = atoi(argv[++i]); msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) { } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
rpcInit.sessions = atoi(argv[++i]); rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) { } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
tsCompressMsgSize = atoi(argv[++i]); tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-w")==0 && i < argc-1) { } else if (strcmp(argv[i], "-w") == 0 && i < argc - 1) {
commit = atoi(argv[++i]); commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) { } else if (strcmp(argv[i], "-v") == 0 && i < argc - 1) {
syncInfo.version = atoi(argv[++i]); syncInfo.version = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) { } else if (strcmp(argv[i], "-r") == 0 && i < argc - 1) {
pCfg->replica = atoi(argv[++i]); pCfg->replica = atoi(argv[++i]);
} else if (strcmp(argv[i], "-q")==0 && i < argc-1) { } else if (strcmp(argv[i], "-q") == 0 && i < argc - 1) {
pCfg->quorum = atoi(argv[++i]); pCfg->quorum = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) { } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]); rpcDebugFlag = atoi(argv[++i]);
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
...@@ -411,10 +394,10 @@ int main(int argc, char *argv[]) { ...@@ -411,10 +394,10 @@ int main(int argc, char *argv[]) {
exit(0); exit(0);
} }
} }
uDebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag;
dDebugFlag = rpcDebugFlag; dDebugFlag = rpcDebugFlag;
//tmrDebugFlag = rpcDebugFlag; // tmrDebugFlag = rpcDebugFlag;
tsAsyncLog = 0; tsAsyncLog = 0;
taosInitLog("server.log", 1000000, 10); taosInitLog("server.log", 1000000, 10);
...@@ -443,35 +426,39 @@ int main(int argc, char *argv[]) { ...@@ -443,35 +426,39 @@ int main(int argc, char *argv[]) {
SNodesRole nroles; SNodesRole nroles;
while (1) { while (1) {
char c = getchar(); int c = getchar();
switch(c) { switch (c) {
case '1': case '1':
pCfg->replica = 1; doSync(); pCfg->replica = 1;
break; doSync();
break;
case '2': case '2':
pCfg->replica = 2; doSync(); pCfg->replica = 2;
doSync();
break; break;
case '3': case '3':
pCfg->replica = 3; doSync(); pCfg->replica = 3;
doSync();
break; break;
case '4': case '4':
pCfg->replica = 4; doSync(); pCfg->replica = 4;
doSync();
break; break;
case '5': case '5':
pCfg->replica = 5; doSync(); pCfg->replica = 5;
doSync();
break; break;
case 's': case 's':
syncGetNodesRole(syncHandle, &nroles); syncGetNodesRole(syncHandle, &nroles);
for (int i=0; i<pCfg->replica; ++i) for (int i = 0; i < pCfg->replica; ++i)
printf("=== nodeId:%d role:%s\n", nroles.nodeId[i], syncRole[nroles.role[i]]); printf("=== nodeId:%d role:%s\n", nroles.nodeId[i], syncRole[nroles.role[i]]);
break; break;
default: default:
break; break;
} }
if (c=='q') break; if (c == 'q') break;
} }
syncStop(syncHandle); syncStop(syncHandle);
...@@ -483,5 +470,3 @@ int main(int argc, char *argv[]) { ...@@ -483,5 +470,3 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册