提交 eb8c0bd7 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

publish sync module

上级 4ba313ce
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
LIST(REMOVE_ITEM SRC ./src/tarbitrator.c)
ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common)
LIST(APPEND BIN_SRC ./src/tarbitrator.c)
LIST(APPEND BIN_SRC ./src/taosTcpPool.c)
ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
TARGET_LINK_LIBRARIES(tarbitrator tutil sync common)
ADD_SUBDIRECTORY(test)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNCINT_H
#define TDENGINE_SYNCINT_H
#ifdef __cplusplus
extern "C" {
#endif
#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
#define sWarn(...) { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); }}
#define sInfo(...) { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define TAOS_SMSG_SYNC_DATA 1
#define TAOS_SMSG_FORWARD 2
#define TAOS_SMSG_FORWARD_RSP 3
#define TAOS_SMSG_SYNC_REQ 4
#define TAOS_SMSG_SYNC_RSP 5
#define TAOS_SMSG_SYNC_MUST 6
#define TAOS_SMSG_STATUS 7
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
#pragma pack(push, 1)
typedef struct {
char type; // msg type
char pversion; // protocol version
char reserved[6]; // not used
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
// char cont[]; // message content starts from here
} SSyncHead;
typedef struct {
SSyncHead syncHead;
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
int32_t sourceId; // only for arbitrator
} SFirstPkt;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
int8_t role;
int8_t ack;
uint64_t version;
SPeerStatus peersStatus[];
} SPeersStatus;
typedef struct {
char name[TSDB_FILENAME_LEN];
uint32_t magic;
uint32_t index;
uint64_t fversion;
int32_t size;
} SFileInfo;
typedef struct {
int8_t sync;
} SFileAck;
typedef struct {
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
typedef struct {
char *buffer;
int bufferSize;
char *offset;
int forwards;
int code;
} SRecvBuffer;
typedef struct {
uint64_t version;
void *mhandle;
int8_t acks;
int8_t nacks;
int8_t confirmed;
int32_t code;
uint64_t time;
} SFwdInfo;
typedef struct {
int first;
int last;
int fwds; // number of forwards
SFwdInfo fwdInfo[];
} SSyncFwds;
typedef struct SsyncPeer {
int32_t nodeId;
uint32_t ip;
uint16_t port;
char fqdn[TSDB_FQDN_LEN]; // peer ip string
char id[TSDB_EP_LEN+16]; // peer vgId + end point
int8_t role;
int8_t sstatus; // sync status
uint64_t version;
uint64_t sversion; // track the peer version in retrieve process
int syncFd;
int peerFd; // forward FD
void *timer;
void *pConn;
int notifyFd;
int watchNum;
int *watchFd;
int8_t refCount; // reference count
struct SSyncNode *pSyncNode;
} SSyncPeer;
typedef struct SSyncNode {
char path[TSDB_FILENAME_LEN];
int8_t replica;
int8_t quorum;
uint32_t vgId;
void *ahandle;
int8_t selfIndex;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
SSyncPeer *pMaster;
int8_t refCount;
SRecvBuffer *pRecv;
SSyncFwds *pSyncFwds; // saved forward info if quorum >1
void *pFwdTimer;
FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo;
FWriteToCache writeToCache;
FConfirmForward confirmForward;
FNotifyRole notifyRole;
FNotifyFileSynced notifyFileSynced;
pthread_mutex_t mutex;
} SSyncNode;
// sync module global
extern int tsSyncNum;
extern char tsNodeFqdn[TSDB_FQDN_LEN];
void *syncRetrieveData(void *param);
void *syncRestoreData(void *param);
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer);
int syncDecPeerRef(SSyncPeer *pPeer);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEPEER_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_PLUGINS_SYNC_H
#define TDENGINE_PLUGINS_SYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
int32_t syncTest1();
int32_t syncTest2();
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TCP_POOL_H
#define TDENGINE_TCP_POOL_H
#ifdef __cplusplus
extern "C" {
#endif
typedef void* ttpool_h;
typedef void* tthread_h;
typedef struct {
int numOfThreads;
uint32_t serverIp;
short port;
int bufferSize;
void (*processBrokenLink)(void *ahandle);
int (*processIncomingMsg)(void *ahandle, void *buffer);
void (*processIncomingConn)(int fd, uint32_t ip);
} SPoolInfo;
ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo);
void taosCloseTcpThreadPool(ttpool_h);
void *taosAllocateTcpConn(void *, void *ahandle, int connFd);
void taosFreeTcpConn(void *);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TCP_POOL_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#include <stdint.h>
//#include <stdbool.h>
#include "os.h"
#include "hash.h"
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
#include "ttime.h"
#include "tsocket.h"
#include "tglobal.h"
#include "taoserror.h"
#include "taosTcpPool.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
// global configurable
int tsMaxSyncNum = 4;
int tsSyncTcpThreads = 2;
int tsMaxWatchFiles = 100;
int tsMaxFwdInfo = 200;
int tsSyncTimer = 1;
//int sDebugFlag = 135;
//char tsArbitrator[TSDB_FQDN_LEN] = {0};
// module global, not configurable
int tsSyncNum; // number of sync in process in whole system
char tsNodeFqdn[TSDB_FQDN_LEN];
static int tsNodeNum; // number of nodes in system
static ttpool_h tsTcpPool;
static void *syncTmrCtrl = NULL;
static void *vgIdHash;
static pthread_once_t syncModuleInit = PTHREAD_ONCE_INIT;
// local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer);
static void syncCheckPeerConnection(void *param, void *tmrId);
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack);
static void syncProcessBrokenLink(void *param);
static int syncProcessPeerMsg(void *param, void *buffer);
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode);
static void syncAddNodeRef(SSyncNode *pNode);
static void syncDecNodeRef(SSyncNode *pNode);
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void syncMonitorFwdInfos(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
char* syncRole[] = {
"offline",
"unsynced",
"slave",
"master"
};
static void syncModuleInitFunc() {
SPoolInfo info;
info.numOfThreads = tsSyncTcpThreads;
info.serverIp = 0;
info.port = tsSyncPort;
info.bufferSize = 640000;
info.processBrokenLink = syncProcessBrokenLink;
info.processIncomingMsg = syncProcessPeerMsg;
info.processIncomingConn = syncProcessIncommingConnection;
tsTcpPool = taosOpenTcpThreadPool(&info);
if (tsTcpPool == NULL) return;
syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (syncTmrCtrl == NULL) {
taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
return;
}
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
if (vgIdHash == NULL) {
taosTmrCleanUp(syncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
syncTmrCtrl = NULL;
return;
}
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
}
void *syncStart(const SSyncInfo *pInfo)
{
const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1);
if (pNode == NULL) {
sError("no memory to allocate syncNode");
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
pthread_once(&syncModuleInit, syncModuleInitFunc);
if (tsTcpPool == NULL) {
free(pNode);
syncModuleInit = PTHREAD_ONCE_INIT;
sError("failed to init sync module(%s)", tstrerror(errno));
return NULL;
}
atomic_add_fetch_32(&tsNodeNum, 1);
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
pthread_mutex_init(&pNode->mutex, NULL);
pNode->ahandle = pInfo->ahandle;
pNode->getFileInfo = pInfo->getFileInfo;
pNode->getWalInfo = pInfo->getWalInfo;
pNode->writeToCache = pInfo->writeToCache;
pNode->notifyRole = pInfo->notifyRole;
pNode->confirmForward = pInfo->confirmForward;
pNode->notifyFileSynced = pInfo->notifyFileSynced;
pNode->selfIndex = -1;
pNode->vgId = pInfo->vgId;
pNode->replica = pCfg->replica;
pNode->quorum = pCfg->quorum;
for (int i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort))
pNode->selfIndex = i;
}
if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG;
syncStop(pNode);
return NULL;
}
nodeVersion = pInfo->version; // set the initial version
nodeRole = (pNode->replica > 1) ? TAOS_SYNC_ROLE_UNSYNCED : TAOS_SYNC_ROLE_MASTER;
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]);
pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo*sizeof(SFwdInfo), 1);
if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno);
syncStop(pNode);
return NULL;
}
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
if (pNode->pFwdTimer == NULL) {
sError("vgId:%d, failed to allocate timer", pNode->vgId);
syncStop(pNode);
return NULL;
}
syncAddArbitrator(pNode);
syncAddNodeRef(pNode);
taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
if (pNode->notifyRole)
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
return pNode;
}
void syncStop(void *param)
{
SSyncNode *pNode = param;
SSyncPeer *pPeer;
if (pNode == NULL) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId);
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer) syncRemovePeer(pPeer);
}
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pPeer) syncRemovePeer(pPeer);
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode);
}
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg)
{
SSyncNode *pNode = param;
int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole],
pNewCfg->replica, pNode->replica);
pthread_mutex_lock(&(pNode->mutex));
for (i = 0; i < pNode->replica; ++i) {
for (j = 0; j < pNewCfg->replica; ++j) {
if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) &&
(pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort))
break;
}
if (j >= pNewCfg->replica) {
syncRemovePeer(pNode->peerInfo[i]);
pNode->peerInfo[i] = NULL;
}
}
SSyncPeer *newPeers[TAOS_SYNC_MAX_REPLICA];
for (i = 0; i < pNewCfg->replica; ++i) {
const SNodeInfo *pNewNode = &pNewCfg->nodeInfo[i];
for (j = 0; j < pNode->replica; ++j) {
if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) &&
(pNode->peerInfo[j]->port == pNewNode->nodePort))
break;
}
if (j >= pNode->replica) {
newPeers[i] = syncAddPeer(pNode, pNewNode);
} else {
newPeers[i] = pNode->peerInfo[j];
}
if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort))
pNode->selfIndex = i;
}
pNode->replica = pNewCfg->replica;
pNode->quorum = pNewCfg->quorum;
memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica);
for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i)
pNode->peerInfo[i] = NULL;
syncAddArbitrator(pNode);
if (pNewCfg->replica <= 1) {
sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId);
nodeRole = TAOS_SYNC_ROLE_MASTER;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
}
pthread_mutex_unlock(&(pNode->mutex));
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]);
syncBroadcastStatus(pNode);
return 0;
}
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype)
{
SSyncNode *pNode = param;
SSyncPeer *pPeer;
SSyncHead *pSyncHead;
SWalHead *pWalHead = data;
int fwdLen;
int code = 0;
if (pNode == NULL) return 0;
// always update version
nodeVersion = pWalHead->version;
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER ) return 0;
// only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance
pSyncHead = (SSyncHead *) ( ((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); //include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex));
if (pNode->quorum > 1) {
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
code = 1;
}
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer == NULL || pPeer->peerFd <0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
int retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) {
sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else {
sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
syncRestartConnection(pPeer);
}
}
pthread_mutex_unlock(&(pNode->mutex));
return code;
}
void syncConfirmForward(void *param, uint64_t version, int32_t code)
{
SSyncNode *pNode = param;
if (pNode == NULL) return;
if (pNode->quorum <= 1) return;
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer == NULL) return;
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
SSyncHead *pHead = (SSyncHead *) msg;
pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp);
SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
pFwdRsp->version = version;
pFwdRsp->code = code;
int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
int retLen = write(pPeer->peerFd, msg, msgLen);
if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version);
} else {
sDebug("%s, failed to send forward ack, restart", pPeer->id);
syncRestartConnection(pPeer);
}
}
void syncRecover(void *param) {
SSyncNode *pNode = param;
SSyncPeer *pPeer;
// to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
nodeVersion = 0;
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) {
pPeer = (SSyncPeer *) pNode->peerInfo[i];
if (pPeer->peerFd >= 0) {
syncRestartConnection(pPeer);
}
}
pthread_mutex_unlock(&(pNode->mutex));
}
int syncGetNodesRole(void *param, SNodesRole *pNodesRole)
{
SSyncNode *pNode = param;
pNodesRole->selfIndex = pNode->selfIndex;
for (int i=0; i<pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
pNodesRole->role[i] = pNode->peerInfo[i]->role;
}
return 0;
}
static void syncAddArbitrator(SSyncNode *pNode)
{
SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
// if not configured, return right away
if (tsArbitrator[0] == 0) {
if (pPeer) syncRemovePeer(pPeer);
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL;
return;
}
SNodeInfo nodeInfo;
nodeInfo.nodeId = 0;
taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort);
nodeInfo.nodePort += TSDB_PORT_SYNC;
if (pPeer) {
if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) {
return;
} else {
syncRemovePeer(pPeer);
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL;
}
}
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
}
static void syncAddNodeRef(SSyncNode *pNode)
{
atomic_add_fetch_8(&pNode->refCount, 1);
}
static void syncDecNodeRef(SSyncNode *pNode)
{
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
pthread_mutex_destroy(&pNode->mutex);
tfree(pNode->pRecv);
tfree(pNode->pSyncFwds);
tfree(pNode);
if (atomic_sub_fetch_32(&tsNodeNum, 1) == 0) {
if (tsTcpPool) taosCloseTcpThreadPool(tsTcpPool);
if (syncTmrCtrl) taosTmrCleanUp(syncTmrCtrl);
if (vgIdHash) taosHashCleanup(vgIdHash);
syncTmrCtrl = NULL;
tsTcpPool = NULL;
vgIdHash = NULL;
syncModuleInit = PTHREAD_ONCE_INIT;
sDebug("sync module is cleaned up");
}
}
}
void syncAddPeerRef(SSyncPeer *pPeer)
{
atomic_add_fetch_8(&pPeer->refCount, 1);
}
int syncDecPeerRef(SSyncPeer *pPeer)
{
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
syncDecNodeRef(pPeer->pSyncNode);
sDebug("%s, resource is freed", pPeer->id);
tfree(pPeer->watchFd);
tfree(pPeer);
return 0;
}
return 1;
}
static void syncClosePeerConn(SSyncPeer *pPeer)
{
taosTmrStopA(&pPeer->timer);
tclose(pPeer->syncFd);
if (pPeer->peerFd >=0) {
pPeer->peerFd = -1;
taosFreeTcpConn(pPeer->pConn);
}
}
static void syncRemovePeer(SSyncPeer *pPeer)
{
sInfo("%s, it is removed", pPeer->id);
pPeer->ip = 0;
syncClosePeerConn(pPeer);
syncDecPeerRef(pPeer);
}
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo)
{
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL;
SSyncPeer *pPeer = (SSyncPeer *) calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL;
pPeer->nodeId = pInfo->nodeId;
tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn));
pPeer->ip = ip;
pPeer->port = pInfo->nodePort;
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port);
pPeer->peerFd = -1;
pPeer->syncFd = -1;
pPeer->role = TAOS_SYNC_ROLE_OFFLINE;
pPeer->pSyncNode = pNode;
pPeer->refCount = 1;
sInfo("%s, it is configured", pPeer->id);
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, start to check peer connection", pPeer->id);
taosTmrReset(syncCheckPeerConnection, 100, pPeer, syncTmrCtrl, &pPeer->timer);
}
syncAddNodeRef(pNode);
return pPeer;
}
void syncBroadcastStatus(SSyncNode *pNode)
{
SSyncPeer *pPeer;
for (int i = 0; i < pNode->replica; ++i) {
if ( i == pNode->selfIndex ) continue;
pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1);
}
}
static void syncChooseMaster(SSyncNode *pNode) {
SSyncPeer *pPeer;
int onlineNum = 0;
int index = -1;
int replica = pNode->replica;
sDebug("vgId:%d, choose master", pNode->vgId);
for (int i = 0; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE)
onlineNum++;
}
if (onlineNum == pNode->replica) {
// if all peers are online, peer with highest version shall be master
index = 0;
for (int i = 1; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version)
index = i;
}
}
// add arbitrator connection
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++;
replica = pNode->replica + 1;
}
if (index < 0 && onlineNum > replica/2.0) {
// over half of nodes are online
for (int i = 0; i < pNode->replica; ++i) {
//slave with highest version shall be master
pPeer = pNode->peerInfo[i];
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
if (index < 0 || pPeer->version > pNode->peerInfo[index]->version)
index = i;
}
}
}
if (index >= 0) {
if (index == pNode->selfIndex) {
sInfo("vgId:%d, start to work as master", pNode->vgId);
nodeRole = TAOS_SYNC_ROLE_MASTER;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
} else {
pPeer = pNode->peerInfo[index];
sInfo("%s, it shall work as master", pPeer->id);
}
} else {
sDebug("vgId:%d, failed to choose master", pNode->vgId);
}
}
static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
int onlineNum = 0;
int index = -1;
int replica = pNode->replica;
for (int i = 0; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE)
onlineNum++;
}
// add arbitrator connection
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++;
replica = pNode->replica + 1;
}
if (onlineNum <= replica*0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
}
} else {
for (int i=0; i<pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i];
if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue;
if ( index < 0 ) {
index = i;
} else { // multiple masters, it shall not happen
if ( i == pNode->selfIndex ) {
sError("%s, peer is master, work as slave instead", pTemp->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
}
}
}
}
SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL;
return pMaster;
}
static int syncValidateMaster(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
int code = 0;
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
code = -1;
for (int i = 0; i < pNode->replica; ++i) {
if ( i == pNode->selfIndex ) continue;
syncRestartPeer(pNode->peerInfo[i]);
}
}
return code;
}
static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole)
{
SSyncNode *pNode = pPeer->pSyncNode;
int8_t peerOldRole = pPeer->role;
int8_t selfOldRole = nodeRole;
int8_t i, syncRequired = 0;
pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
pPeer->role = newRole;
sDebug("%s, own role:%s, new peer role:%s", pPeer->id,
syncRole[nodeRole], syncRole[pPeer->role]);
SSyncPeer *pMaster = syncCheckMaster(pNode);
if ( pMaster ) {
// master is there
pNode->pMaster = pMaster;
sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version);
if (syncValidateMaster(pPeer) < 0) return;
if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) {
if ( nodeVersion < pMaster->version) {
syncRequired = 1;
} else {
sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version);
nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
}
} else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
// nodeVersion = pMaster->version;
}
} else {
// master not there, if all peer's state and version are consistent, choose the master
int consistent = 0;
if (peersStatus) {
for (i = 0; i < pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i];
if (pTemp->role != peersStatus[i].role) break;
if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break;
}
if (i >= pNode->replica) consistent = 1;
} else {
if (pNode->replica == 2) consistent = 1;
}
if (consistent)
syncChooseMaster(pNode);
}
if (syncRequired) {
syncRecoverFromMaster(pMaster);
}
if (peerOldRole != newRole || nodeRole != selfOldRole)
syncBroadcastStatus(pNode);
}
static void syncRestartPeer(SSyncPeer *pPeer) {
sDebug("%s, restart connection", pPeer->id);
syncClosePeerConn(pPeer);
pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) )
taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer);
}
void syncRestartConnection(SSyncPeer *pPeer)
{
if (pPeer->ip == 0) return;
syncRestartPeer(pPeer);
syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE);
}
static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
sDebug("%s, sync-req is received", pPeer->id);
if (pPeer->ip == 0) return;
if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
sError("%s, I am not master anymore", pPeer->id);
tclose(pPeer->syncFd);
return;
}
if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started", pPeer->id);
return; // already started
}
// start a new thread to retrieve the data
syncAddPeerRef(pPeer);
pthread_attr_t thattr;
pthread_t thread;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
int ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer);
pthread_attr_destroy(&thattr);
if (ret != 0) {
sError("%s, failed to create sync thread(%s)", pPeer->id, strerror(errno));
syncDecPeerRef(pPeer);
} else {
pPeer->sstatus = TAOS_SYNC_STATUS_START;
sDebug("%s, thread is created to retrieve data", pPeer->id);
}
}
static void syncNotStarted(void *param, void *tmrId)
{
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
pPeer->timer = NULL;
sInfo("%s, sync connection is still not up, restart", pPeer->id);
syncRestartConnection(pPeer);
pthread_mutex_unlock(&(pNode->mutex));
}
static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
syncRecoverFromMaster(pPeer);
pthread_mutex_unlock(&(pNode->mutex));
}
static void syncRecoverFromMaster(SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus);
return;
}
taosTmrStopA(&pPeer->timer);
if (tsSyncNum >= tsMaxSyncNum) {
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
taosTmrReset(syncTryRecoverFromMaster, 500, pPeer, syncTmrCtrl, &pPeer->timer);
return;
}
sDebug("%s, try to sync", pPeer->id)
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
taosTmrReset(syncNotStarted, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer);
if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt) ) {
sError("%s, failed to send sync-req to peer", pPeer->id);
} else {
nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-req is sent", pPeer->id);
}
return;
}
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp *pFwdRsp = (SFwdRsp *) cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo *pFwdInfo;
sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first
for (int i=0; i<pSyncFwds->fwds; ++i) {
pFwdInfo = pSyncFwds->fwdInfo + (i+pSyncFwds->first)%tsMaxFwdInfo;
if (pFwdRsp->version == pFwdInfo->version) break;
}
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode);
}
}
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead *pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
//nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
} else {
sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version);
}
}
return;
}
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
pPeer->version = pPeersStatus->version;
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
if (pPeersStatus->ack)
syncSendPeersStatusMsgToPeer(pPeer, 0);
}
static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
if (pPeer->peerFd <0) return -1;
int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen);
return -1;
}
// head.len = htonl(head.len);
if (pHead->len <0) {
sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len);
return -1;
}
int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
return -1;
}
return 0;
}
static int syncProcessPeerMsg(void *param, void *buffer)
{
SSyncPeer *pPeer = param;
SSyncHead head;
char *cont = (char *)buffer;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
int code = syncReadPeerMsg(pPeer, &head, cont);
if (code == 0) {
if (head.type == TAOS_SMSG_FORWARD) {
syncProcessForwardFromPeer(cont, pPeer);
} else if (head.type == TAOS_SMSG_FORWARD_RSP) {
syncProcessFwdResponse(cont, pPeer);
} else if (head.type == TAOS_SMSG_SYNC_REQ) {
syncProcessSyncRequest(cont, pPeer);
} else if (head.type == TAOS_SMSG_STATUS) {
syncProcessPeersStatusMsg(cont, pPeer);
}
}
pthread_mutex_unlock(&(pNode->mutex));
return code;
}
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack)
{
SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0};
if (pPeer->peerFd <0 || pPeer->ip ==0) return;
SSyncHead *pHead = (SSyncHead *) msg;
SPeersStatus *pPeersStatus = (SPeersStatus *) (msg + sizeof(SSyncHead));
pHead->type = TAOS_SMSG_STATUS;
pHead->len = statusMsgLen - sizeof(SSyncHead);
pPeersStatus->version = nodeVersion;
pPeersStatus->role = nodeRole;
pPeersStatus->ack = ack;
for (int i = 0; i < pNode->replica; ++i) {
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
}
int retLen = write(pPeer->peerFd, msg, statusMsgLen);
if (retLen == statusMsgLen) {
sDebug("%s, status msg is sent", pPeer->id);
} else {
sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer);
}
return;
}
static void syncSetupPeerConnection(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
taosTmrStopA(&pPeer->timer);
if (pPeer->peerFd >= 0) {
sDebug("%s, send role version to peer", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1);
return;
}
int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) {
sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer);
return;
}
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId:0;
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup", pPeer->id);
pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
} else {
sDebug("try later");
close(connFd);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer);
}
}
static void syncCheckPeerConnection(void *param, void *tmrId)
{
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, check peer connection", pPeer->id);
syncSetupPeerConnection(pPeer);
pthread_mutex_unlock(&(pNode->mutex));
}
static void syncCreateRestoreDataThread(SSyncPeer *pPeer)
{
taosTmrStopA(&pPeer->timer);
pthread_attr_t thattr;
pthread_t thread;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
syncAddPeerRef(pPeer);
int ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer);
pthread_attr_destroy(&thattr);
if (ret < 0) {
sError("%s, failed to create sync thread", pPeer->id);
tclose(pPeer->syncFd);
syncDecPeerRef(pPeer);
} else {
sInfo("%s, sync connection is up", pPeer->id);
}
}
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp)
{
char ipstr[24];
int i;
tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno));
taosCloseSocket(connFd);
return;
}
int32_t vgId = firstPkt.syncHead.vgId;
SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId);
taosCloseSocket(connFd);
return;
}
SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&(pNode->mutex));
SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port))
break;
}
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
if (pPeer == NULL) {
sError("vgId:%d, peer:%s not configured", pNode->vgId, firstPkt.fqdn);
taosCloseSocket(connFd);
// syncSendVpeerCfgMsg(sync);
} else {
// first packet tells what kind of link
if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) {
pPeer->syncFd = connFd;
syncCreateRestoreDataThread(pPeer);
} else {
sDebug("%s, TCP connection is already up, close one", pPeer->id);
syncClosePeerConn(pPeer);
pPeer->peerFd = connFd;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1);
}
}
pthread_mutex_unlock(&(pNode->mutex));
return;
}
static void syncProcessBrokenLink(void *param) {
if (param == NULL) return; // the connection for arbitrator
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
syncAddNodeRef(pNode);
pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
pPeer->peerFd = -1;
if (syncDecPeerRef(pPeer) != 0) {
syncRestartConnection(pPeer);
}
pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode);
}
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle)
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds >= tsMaxFwdInfo) {
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
pSyncFwds->fwds--;
}
if (pSyncFwds->fwds > 0)
pSyncFwds->last = (pSyncFwds->last+1) % tsMaxFwdInfo;
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
pFwdInfo->version = version;
pFwdInfo->mhandle = mhandle;
pFwdInfo->acks = 0;
pFwdInfo->confirmed = 0;
pFwdInfo->time = time;
pSyncFwds->fwds++;
sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
}
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode)
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int fwds = pSyncFwds->fwds;
for (int i=0; i<fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break;
pSyncFwds->first = (pSyncFwds->first+1) % tsMaxFwdInfo;
pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
//sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d",
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
memset(pFwdInfo, 0, sizeof(SFwdInfo));
}
}
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code)
{
int confirm = 0;
if (pFwdInfo->code == 0) pFwdInfo->code = code;
if (code == 0) {
pFwdInfo->acks++;
if (pFwdInfo->acks >= pNode->quorum-1)
confirm = 1;
} else {
pFwdInfo->nacks++;
if (pFwdInfo->nacks > pNode->replica-pNode->quorum)
confirm = 1;
}
if (confirm && pFwdInfo->confirmed == 0) {
sDebug("vgId:%d, forward is confirmed, ver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
(*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code);
pFwdInfo->confirmed = 1;
}
}
static void syncMonitorFwdInfos(void *param, void *tmrId)
{
SSyncNode *pNode = param;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex));
for (int i=0; i<pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first+i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break;
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
syncRemoveConfirmedFwdInfo(pNode);
pthread_mutex_unlock(&(pNode->mutex));
}
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
#include "tsocket.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eindex) {
char name[TSDB_FILENAME_LEN*2] = {0};
char fname[TSDB_FILENAME_LEN*3] = {0};
uint32_t magic;
uint64_t fversion;
int32_t size;
uint32_t index = sindex;
SSyncNode *pNode = pPeer->pSyncNode;
if (sindex < 0 || eindex < sindex) return;
while (1) {
name[0] = 0;
magic = (*pNode->getFileInfo)(pNode->ahandle, name, &index, eindex, &size, &fversion);
if (magic == 0) break;
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
remove(fname);
sDebug("%s, %s is removed", pPeer->id, fname);
index++;
if (index > eindex) break;
}
}
static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
{
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info
SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info
SFileAck fileAck;
int code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore
*fversion = 0;
sinfo.index = 0;
while (1) {
// read file info
int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
if (ret < 0 ) break;
// if no more file from master, break;
if (minfo.name[0] == 0 || minfo.magic == 0) {
sDebug("%s, no more files to restore", pPeer->id);
// remove extra files after the current index
syncRemoveExtraFile(pPeer, sinfo.index+1, TAOS_SYNC_MAX_INDEX);
code = 0;
break;
}
// remove extra files on slave between the current and last index
syncRemoveExtraFile(pPeer, pindex+1, minfo.index-1);
pindex = minfo.index;
// check the file info
sinfo = minfo;
sDebug("%s, get file info:%s", pPeer->id, minfo.name);
sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion);
// if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(fileAck));
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1:0;
// send file ack
ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
if (ret <0) break;
// if sync is not required, continue
if (fileAck.sync == 0) {
sDebug("%s, %s is the same", pPeer->id, minfo.name);
continue;
}
// if sync is required, open file, receive from master, and write to file
// get the full path to file
snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name);
int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if ( dfd < 0 ) {
sError("%s, failed to open file:%s", pPeer->id, name);
break;
}
ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size);
fsync(dfd);
close(dfd);
if (ret<0) break;
sDebug("%s, %s is received, size:%d", pPeer->id, minfo.name, minfo.size);
}
if (code == 0 && (minfo.fversion != sinfo.fversion)) {
// data file is changed, code shall be set to 1
*fversion = minfo.fversion;
code = 1;
}
if (code < 0) {
sError("%s, failed to restore %s(%s)", pPeer->id, name, strerror(errno));
}
return code;
}
static int syncRestoreWal(SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
int ret, code = -1;
void *buffer = calloc(1024000, 1); // size for one record
if (buffer == NULL) return -1;
SWalHead *pHead = (SWalHead *)buffer;
while (1) {
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
if (ret <0) break;
if (pHead->len == 0) {code = 0; break;} // wal sync over
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
if (ret <0) break;
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL);
}
if (code<0) {
sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno));
}
free(buffer);
return code;
}
static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
{
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead *pHead = (SWalHead *) offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
offset += pHead->len + sizeof(SWalHead);
return offset;
}
static int syncProcessBufferedFwd(SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv;
int forwards = 0;
sDebug("%s, number of buffered forwards:%d", pPeer->id, pRecv->forwards);
char *offset = pRecv->buffer;
while (forwards < pRecv->forwards) {
offset = syncProcessOneBufferedFwd(pPeer, offset);
forwards++;
}
pthread_mutex_lock(&pNode->mutex);
while (forwards < pRecv->forwards && pRecv->code == 0) {
offset = syncProcessOneBufferedFwd(pPeer, offset);
forwards++;
}
nodeRole = TAOS_SYNC_ROLE_SLAVE;
sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards);
pthread_mutex_unlock(&pNode->mutex);
return pRecv->code;
}
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead)
{
SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv;
if (pRecv == NULL) return -1;
int len = pHead->len + sizeof(SWalHead);
if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) {
memcpy(pRecv->offset, pHead, len);
pRecv->offset += len;
pRecv->forwards++;
sDebug("%s, fwd is saved into queue, ver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
} else {
sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize);
pRecv->code = -1; // set error code
}
return pRecv->code;
}
static void syncCloseRecvBuffer(SSyncNode *pNode)
{
if (pNode->pRecv) {
tfree(pNode->pRecv->buffer);
}
tfree(pNode->pRecv);
}
static int syncOpenRecvBuffer(SSyncNode *pNode)
{
syncCloseRecvBuffer(pNode);
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
if (pRecv == NULL) return -1;
pRecv->bufferSize = 5000000;
pRecv->buffer = malloc(pRecv->bufferSize);
if (pRecv->buffer == NULL) {
free(pRecv);
return -1;
}
pRecv->offset = pRecv->buffer;
pRecv->forwards = 0;
pNode->pRecv = pRecv;
return 0;
}
static int syncRestoreDataStepByStep(SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
nodeSStatus = TAOS_SYNC_STATUS_FILE;
uint64_t fversion = 0;
sDebug("%s, start to restore file", pPeer->id);
int code = syncRestoreFile(pPeer, &fversion);
if (code < 0) {
sError("%s, failed to restore file", pPeer->id);
return -1;
}
// if code > 0, data file is changed, notify app, and pass the version
if (code > 0 && pNode->notifyFileSynced) {
if ( (*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0 ) {
sError("%s, app not in ready state", pPeer->id);
return -1;
}
}
nodeVersion = fversion;
sDebug("%s, start to restore wal", pPeer->id);
if (syncRestoreWal(pPeer) < 0) {
sError("%s, failed to restore wal", pPeer->id);
return -1;
}
nodeSStatus = TAOS_SYNC_STATUS_CACHE;
sDebug("%s, start to insert buffered points", pPeer->id);
if (syncProcessBufferedFwd(pPeer) < 0) {
sError("%s, failed to insert buffered points", pPeer->id);
return -1;
}
return 0;
}
void *syncRestoreData(void *param)
{
SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1);
if (syncOpenRecvBuffer(pNode) < 0) {
sError("%s, failed to allocate recv buffer", pPeer->id);
} else {
if ( syncRestoreDataStepByStep(pPeer) == 0) {
sInfo("%s, it is synced successfully", pPeer->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE;
syncBroadcastStatus(pNode);
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
} else {
sError("%s, failed to restore data, restart connection", pPeer->id);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
syncRestartConnection(pPeer);
}
}
nodeSStatus = TAOS_SYNC_STATUS_INIT;
tclose(pPeer->syncFd)
syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1);
syncDecPeerRef(pPeer);
return NULL;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdint.h>
#include <stdbool.h>
#include <sys/inotify.h>
#include <unistd.h>
#include "os.h"
#include "tlog.h"
#include "tutil.h"
#include "tglobal.h"
#include "ttimer.h"
#include "tsocket.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name)
{
sDebug("%s, start to monitor:%s", pPeer->id, name);
if (pPeer->notifyFd <=0) {
pPeer->watchNum = 0;
pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) {
sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno));
return -1;
}
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles);
if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id);
return -1;
}
memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles);
}
int *wd = pPeer->watchFd + pPeer->watchNum;
if (*wd >= 0) {
if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) {
sError("%s, failed to remove wd:%d(%s)", pPeer->id, *wd, strerror(errno));
return -1;
}
}
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY);
if (*wd == -1) {
sError("%s, failed to add %s(%s)", pPeer->id, name, strerror(errno));
return -1;
}
pPeer->watchNum++;
pPeer->watchNum = (pPeer->watchNum +1) % tsMaxWatchFiles;
return 0;
}
static int syncAreFilesModified(SSyncPeer *pPeer)
{
if (pPeer->notifyFd <=0) return 0;
char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len <0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1;
}
int code = 0;
if (len >0) {
sDebug("%s, processed file is changed", pPeer->id);
code = 1;
}
return code;
}
static int syncRetrieveFile(SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo;
SFileAck fileAck;
int code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
memset(&fileInfo, 0, sizeof(fileInfo));
memset(&fileAck, 0, sizeof(fileAck));
while (1) {
// retrieve file info
fileInfo.name[0] = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion);
//fileInfo.size = htonl(size);
// send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
if (ret < 0 ) break;
// if no file anymore, break
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
sDebug("%s, no more files to sync", pPeer->id);
code = 0; break;
}
// wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
if (ret <0) break;
// set the peer sync version
pPeer->sversion = fileInfo.fversion;
// get the full path to file
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
// add the file into watch list
if ( syncAddIntoWatchList(pPeer, name) <0) break;
// if sync is not required, continue
if (fileAck.sync == 0) {
fileInfo.index++;
sDebug("%s, %s is the same", pPeer->id, fileInfo.name);
continue;
}
// send the file to peer
int sfd = open(name, O_RDONLY);
if ( sfd < 0 ) break;
ret = tsendfile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd);
if (ret <0) break;
sDebug("%s, %s is sent, size:%d", pPeer->id, name, fileInfo.size);
fileInfo.index++;
// check if processed files are modified
if (syncAreFilesModified(pPeer) != 0) break;
}
if (code < 0) {
sError("%s, failed to retrieve file(%s)", pPeer->id, strerror(errno));
}
return code;
}
/* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */
static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
{
int ret;
ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) return -1;
if (ret == 0) return 0;
if (ret != sizeof(SWalHead)) {
// file is not at end yet, it shall be reloaded
*pEvent = *pEvent | IN_MODIFY;
return 0;
}
ret = read(sfd, pHead->cont, pHead->len);
if (ret <0) return -1;
if (ret != pHead->len) {
// file is not at end yet, it shall be reloaded
*pEvent = *pEvent | IN_MODIFY;
return 0;
}
return sizeof(SWalHead) + pHead->len;
}
static int syncMonitorLastWal(SSyncPeer *pPeer, char *name)
{
pPeer->watchNum = 0;
tclose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) {
sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno));
return -1;
}
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles);
if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id);
return -1;
}
memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles);
int *wd = pPeer->watchFd;
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
if (*wd == -1) {
sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno));
return -1;
}
return 0;
}
static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent)
{
char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len <0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1;
}
if (len == 0) return 0;
struct inotify_event *event;
for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
event = (struct inotify_event *) ptr;
if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY;
if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE;
}
if (pEvent != 0)
sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
return 0;
}
static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent)
{
SWalHead *pHead = (SWalHead *) malloc(640000);
int code = -1;
int32_t bytes = 0;
int sfd;
sfd = open(name, O_RDONLY);
if (sfd < 0) return -1;
lseek(sfd, offset, SEEK_SET);
sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion);
while (1) {
int wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
if (wsize <0) break;
if (wsize == 0) { code = 0; break; }
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if ( ret != wsize ) break;
pPeer->sversion = pHead->version;
bytes += wsize;
if (pHead->version >= fversion && fversion > 0) {
code = 0;
bytes = 0;
break;
}
}
free(pHead);
tclose(sfd);
if (code == 0) return bytes;
return -1;
}
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
{
SSyncNode *pNode = pPeer->pSyncNode;
int code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
if (syncAreFilesModified(pPeer) != 0) return -1;
while (1) {
int32_t once = 0; // last WAL has once ever been processed
int64_t offset = 0;
uint64_t fversion = 0;
uint32_t event = 0;
// get full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
// monitor last wal
if (syncMonitorLastWal(pPeer, fname) <0) break;
while (1) {
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event);
if (bytes < 0) break;
// check file changes
if (syncCheckLastWalChanges(pPeer, &event) <0) break;
// if file is not updated or updated once, set the fversion and sstatus
if (((event & IN_MODIFY) == 0) || once) {
if (fversion == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
fversion = nodeVersion; // must read data to fversion
}
}
// if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) {
code = 0;
sDebug("%s, data up to fversion:%ld has been read out, bytes:%d", pPeer->id, fversion, bytes);
break;
}
// if all data are read out, and no update
if ((bytes == 0) && ((event & IN_MODIFY) == 0)) {
// wal file is closed, break
if (event & IN_CLOSE_WRITE) {
code = 0;
sDebug("%s, current wal is closed", pPeer->id);
break;
}
// wal not closed, it means some data not flushed to disk, wait for a while
usleep(10000);
}
// if bytes>0, file is updated, or fversion is not reached but file still open, read again
once = 1;
offset += bytes;
sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes);
event = event & (~IN_MODIFY); // clear IN_MODIFY flag
}
if (code < 0) break;
if (pPeer->sversion >= fversion && fversion > 0) break;
index++; wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
if ( code < 0) break;
if ( wname[0] == 0 ) {code = 0; break;}
// current last wal is closed, there is a new one
sDebug("%s, last wal is closed, try new one", pPeer->id);
}
tclose(pPeer->notifyFd);
return code;
}
static int syncRetrieveWal(SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2];
int32_t size;
struct stat fstat;
int code = -1;
uint32_t index = 0;
while (1) {
// retrieve wal info
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
if (code < 0) break; // error
if (wname[0] == 0) { // no wal file
sDebug("%s, no wal file", pPeer->id);
break;
}
if (code == 0) { // last wal
code = syncProcessLastWal(pPeer, wname, index);
break;
}
// get the full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
// send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok
if ( stat(fname, &fstat) < 0 ) break;
size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
int sfd = open(fname, O_RDONLY);
if (sfd < 0) break;
code = tsendfile(pPeer->syncFd, sfd, NULL, size);
close(sfd);
if (code <0) break;
index++;
if (syncAreFilesModified(pPeer) != 0) break;
}
if (code == 0) {
sDebug("%s, wal retrieve is finished", pPeer->id);
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
SWalHead walHead;
memset(&walHead, 0, sizeof(walHead));
code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
} else {
sError("%s, failed to send wal(%s)", pPeer->id, strerror(errno));
}
return code;
}
static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
{
SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
firstPkt.syncHead.vgId = pNode->vgId;
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
if (write(pPeer->syncFd, (char *) &firstPkt, sizeof(firstPkt)) < 0) {
sError("%s, failed to send syncCmd", pPeer->id);
return -1;
}
pPeer->sversion = 0;
pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
sDebug("%s, start to retrieve file", pPeer->id);
if (syncRetrieveFile(pPeer) < 0) {
sError("%s, failed to retrieve file", pPeer->id);
return -1;
}
// if no files are synced, there must be wal to sync, sversion must be larger than one
if (pPeer->sversion == 0)
pPeer->sversion = 1;
sDebug("%s, start to retrieve wal", pPeer->id);
if (syncRetrieveWal(pPeer) < 0) {
sError("%s, failed to retrieve wal", pPeer->id);
return -1;
}
return 0;
}
void *syncRetrieveData(void *param)
{
SSyncPeer *pPeer = (SSyncPeer *)param;
taosBlockSIGPIPE();
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (pPeer->syncFd < 0) {
sError("%s, failed to open socket to sync", pPeer->id);
} else {
sInfo("%s, sync tcp is setup", pPeer->id);
if (syncRetrieveDataStepByStep(pPeer) == 0) {
sDebug("%s, sync retrieve process is successful", pPeer->id);
} else {
sError("%s, failed to retrieve data, restart connection", pPeer->id);
syncRestartConnection(pPeer);
}
}
tclose(pPeer->notifyFd);
tclose(pPeer->syncFd);
syncDecPeerRef(pPeer);
return NULL;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tulog.h"
#include "tutil.h"
#include "tsocket.h"
#include "taoserror.h"
#include "taosTcpPool.h"
typedef struct SThreadObj {
pthread_t thread;
bool stop;
int pollFd;
int numOfFds;
struct SPoolObj *pPool;
} SThreadObj;
typedef struct SPoolObj {
SPoolInfo info;
SThreadObj **pThread;
pthread_t thread;
int nextId;
int acceptFd; // FD for accept new connection
} SPoolObj;
typedef struct {
SThreadObj *pThread;
void *ahandle;
int fd;
int closedByApp;
} SConnObj;
static void *taosAcceptPeerTcpConnection(void *argv);
static void *taosProcessTcpData(void *param);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool);
static void taosStopPoolThread(SThreadObj* pThread);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo)
{
pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
if (pPool == NULL) {
uError("TCP server, no enough memory");
return NULL;
}
pPool->info = *pInfo;
pPool->pThread = (SThreadObj **) calloc(sizeof(SThreadObj *), pInfo->numOfThreads);
if (pPool->pThread == NULL) {
uError("TCP server, no enough memory");
free(pPool);
return NULL;
}
pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port);
if (pPool->acceptFd < 0) {
free(pPool->pThread); free(pPool);
uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno));
return NULL;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pPool->thread), &thattr, (void *) taosAcceptPeerTcpConnection, pPool) != 0) {
uError("TCP server, failed to create accept thread, reason:%s", strerror(errno));
close(pPool->acceptFd);
free(pPool->pThread); free(pPool);
return NULL;
}
pthread_attr_destroy(&thattr);
uDebug("%p TCP pool is created", pPool);
return pPool;
}
void taosCloseTcpThreadPool(void *param)
{
SPoolObj *pPool = (SPoolObj *)param;
SThreadObj *pThread;
shutdown(pPool->acceptFd, SHUT_RD);
pthread_join(pPool->thread, NULL);
for (int i = 0; i < pPool->info.numOfThreads; ++i) {
pThread = pPool->pThread[i];
if (pThread) taosStopPoolThread(pThread);
}
tfree(pPool->pThread);
free(pPool);
uDebug("%p TCP pool is closed", pPool);
}
void *taosAllocateTcpConn(void *param, void *pPeer, int connFd)
{
struct epoll_event event;
SPoolObj *pPool = (SPoolObj *)param;
SConnObj *pConn = (SConnObj *) calloc(sizeof(SConnObj), 1);
if (pConn == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
SThreadObj *pThread = taosGetTcpThread(pPool);
if (pThread == NULL) {
free(pConn);
return NULL;
}
pConn->fd = connFd;
pConn->pThread = pThread;
pConn->ahandle = pPeer;
pConn->closedByApp = 0;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = pConn;
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
uError("failed to add fd:%d(%s)", connFd, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
free(pConn);
pConn = NULL;
} else {
pThread->numOfFds++;
uDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds);
}
return pConn;
}
void taosFreeTcpConn(void *param)
{
SConnObj *pConn = (SConnObj *)param;
SThreadObj *pThread = pConn->pThread;
uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
pConn->closedByApp = 1;
shutdown(pConn->fd, SHUT_WR);
}
static void taosProcessBrokenLink(SConnObj *pConn) {
SThreadObj *pThread = pConn->pThread;
SPoolObj *pPool = pThread->pPool;
SPoolInfo *pInfo = &pPool->info;
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
(*pInfo->processBrokenLink)(pConn->ahandle);
pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
uDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
tclose(pConn->fd);
free(pConn);
}
#define maxEvents 10
static void *taosProcessTcpData(void *param) {
SThreadObj *pThread = (SThreadObj *) param;
SPoolObj *pPool = pThread->pPool;
SPoolInfo *pInfo = &pPool->info;
SConnObj *pConn = NULL;
struct epoll_event events[maxEvents];
void *buffer = malloc(pInfo->bufferSize);
taosBlockSIGPIPE();
while (1) {
if (pThread->stop) break;
int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, -1);
if (pThread->stop) {
uDebug("%p TCP epoll thread is exiting...", pThread);
break;
}
if (fdNum < 0) {
uError("epoll_wait failed (%s)", strerror(errno));
continue;
}
for (int i = 0; i < fdNum; ++i) {
pConn = events[i].data.ptr;
assert(pConn);
if (events[i].events & EPOLLERR) {
taosProcessBrokenLink(pConn);
continue;
}
if (events[i].events & EPOLLHUP) {
taosProcessBrokenLink(pConn);
continue;
}
if (events[i].events & EPOLLRDHUP) {
taosProcessBrokenLink(pConn);
continue;
}
if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) {
taosFreeTcpConn(pConn);
continue;
}
}
}
}
close(pThread->pollFd);
free(pThread);
free(buffer);
uDebug("%p TCP epoll thread exits", pThread);
return NULL;
}
static void *taosAcceptPeerTcpConnection(void *argv) {
SPoolObj *pPool = (SPoolObj *)argv;
SPoolInfo *pInfo = &pPool->info;
taosBlockSIGPIPE();
while (1) {
struct sockaddr_in clientAddr;
socklen_t addrlen = sizeof(clientAddr);
int connFd = accept(pPool->acceptFd, (struct sockaddr *) &clientAddr, &addrlen);
if (connFd < 0) {
if (errno == EINVAL) {
uDebug("%p TCP server accept is exiting...", pPool);
break;
} else {
uError("TCP accept failure, reason:%s", strerror(errno));
continue;
}
}
//uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port);
taosKeepTcpAlive(connFd);
(*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr);
}
tclose(pPool->acceptFd);
return NULL;
}
static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
SThreadObj *pThread = pPool->pThread[pPool->nextId];
if (pThread) return pThread;
pThread = (SThreadObj *) calloc(1, sizeof(SThreadObj));
if (pThread == NULL) return NULL;
pThread->pPool = pPool;
pThread->pollFd = epoll_create(10); // size does not matter
if (pThread->pollFd < 0) {
free(pThread);
return NULL;
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int ret = pthread_create(&(pThread->thread), &thattr, (void *) taosProcessTcpData, pThread);
pthread_attr_destroy(&thattr);
if (ret != 0) {
close(pThread->pollFd);
free(pThread);
return NULL;
}
uDebug("%p TCP epoll thread is created", pThread);
pPool->pThread[pPool->nextId] = pThread;
pPool->nextId++;
pPool->nextId = pPool->nextId % pPool->info.numOfThreads;
return pThread;
}
static void taosStopPoolThread(SThreadObj* pThread) {
pThread->stop = true;
if (pThread->thread == pthread_self()) {
pthread_detach(pthread_self());
return;
}
// save thread ID into a local variable, since pThread is freed when the thread exits
pthread_t thread = pThread->thread;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption
uError("failed to create eventfd(%s)", strerror(errno));
pthread_cancel(pThread->thread);
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption
uError("failed to call epoll_ctl(%s)", strerror(errno));
pthread_cancel(pThread->thread);
}
pthread_join(thread, NULL);
tclose(fd);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "hash.h"
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
#include "ttime.h"
#include "tsocket.h"
#include "tglobal.h"
#include "taoserror.h"
#include "taosTcpPool.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param);
static int arbProcessPeerMsg(void *param, void *buffer);
static sem_t tsArbSem;
static ttpool_h tsArbTcpPool;
typedef struct {
char id[TSDB_EP_LEN+24];
int nodeFd;
void *pConn;
} SNodeConn;
int main(int argc, char *argv[]) {
char arbLogPath[TSDB_FILENAME_LEN + 16] = {0};
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
tsServerPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
debugFlag = atoi(argv[++i]);
} else if (strcmp(argv[i], "-g")==0 && i < argc-1) {
if (strlen(argv[++i]) > TSDB_FILENAME_LEN) continue;
tstrncpy(arbLogPath, argv[i], sizeof(arbLogPath));
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-p port]: server port number, default is:%d\n", tsServerPort);
printf(" [-d debugFlag]: debug flag, default:%d\n", debugFlag);
printf(" [-g logFilePath]: log file pathe, default:%s\n", arbLogPath);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
if (sem_init(&tsArbSem, 0, 0) != 0) {
printf("failed to create exit semphore\n");
exit(EXIT_FAILURE);
}
/* Set termination handler. */
struct sigaction act = {{0}};
act.sa_flags = SA_SIGINFO;
act.sa_sigaction = arbSignalHandler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL);
tsAsyncLog = 0;
strcat(arbLogPath, "/arbitrator.log");
taosInitLog(arbLogPath, 1000000, 10);
taosGetFqdn(tsNodeFqdn);
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
SPoolInfo info;
info.numOfThreads = 1;
info.serverIp = 0;
info.port = tsSyncPort;
info.bufferSize = 640000;
info.processBrokenLink = arbProcessBrokenLink;
info.processIncomingMsg = arbProcessPeerMsg;
info.processIncomingConn = arbProcessIncommingConnection;
tsArbTcpPool = taosOpenTcpThreadPool(&info);
if (tsArbTcpPool == NULL) {
sDebug("failed to open TCP thread pool, exit...");
return -1;
}
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsServerPort);
for (int res = sem_wait(&tsArbSem); res != 0; res = sem_wait(&tsArbSem)) {
if (res != EINTR) break;
}
taosCloseTcpThreadPool(tsArbTcpPool);
sInfo("TAOS arbitrator is shut down\n");
closelog();
return 0;
}
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp)
{
char ipstr[24];
tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno));
taosCloseSocket(connFd);
return;
}
SNodeConn *pNode = (SNodeConn *) calloc(sizeof(SNodeConn), 1);
if (pNode == NULL) {
sError("failed to allocate memory(%s)", strerror(errno));
taosCloseSocket(connFd);
return;
}
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
if (firstPkt.syncHead.vgId) {
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
tfree(pNode);
taosCloseSocket(connFd);
return;
}
sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd;
pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd);
return;
}
static void arbProcessBrokenLink(void *param) {
SNodeConn *pNode = param;
sDebug("%s, TCP link is broken(%s), close connection", pNode->id, strerror(errno));
tfree(pNode);
}
static int arbProcessPeerMsg(void *param, void *buffer)
{
SNodeConn *pNode = param;
SSyncHead head;
int bytes = 0;
char *cont = (char *)buffer;
int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head));
if (hlen != sizeof(head)) {
sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen);
return -1;
}
bytes = taosReadMsg(pNode->nodeFd, cont, head.len);
if (bytes != head.len) {
sDebug("%s, failed to read, bytes:%d len:%d", pNode->id, bytes, head.len);
return -1;
}
sDebug("%s, msg is received, len:%d", pNode->id, head.len);
return 0;
}
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) {
struct sigaction act = {{0}};
act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL);
sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
// inform main thread to exit
sem_post(&tsArbSem);
}
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(../inc)
LIST(APPEND CLIENT_SRC ./syncClient.c)
ADD_EXECUTABLE(syncClient ${CLIENT_SRC})
TARGET_LINK_LIBRARIES(syncClient sync trpc common)
LIST(APPEND SERVER_SRC ./syncServer.c)
ADD_EXECUTABLE(syncServer ${SERVER_SRC})
TARGET_LINK_LIBRARIES(syncServer sync trpc common)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tglobal.h"
#include "tulog.h"
#include "trpc.h"
#include "taoserror.h"
typedef struct {
int index;
SRpcEpSet epSet;
int num;
int numOfReqs;
int msgSize;
sem_t rspSem;
sem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle;
uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
sem_post(&pInfo->rspSem);
}
int tcount = 0;
void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0};
uDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
if ( pInfo->num % 20000 == 0 )
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
sem_wait(&pInfo->rspSem);
}
uDebug("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcEpSet epSet;
char secret[TSDB_KEY_LEN] = "mypassword";
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
epSet.numOfEps = 1;
epSet.inUse = 0;
epSet.port[0] = 7000;
epSet.port[1] = 7000;
strcpy(epSet.fqdn[0], serverIp);
strcpy(epSet.fqdn[1], "192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
epSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
strcpy(epSet.fqdn[0], argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
uError("failed to initialize RPC");
return -1;
}
uInfo("client is initialized");
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
sem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds
uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLog();
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include <stdint.h>
#include "os.h"
#include "tulog.h"
#include "tglobal.h"
#include "tsocket.h"
#include "trpc.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
int msgSize = 128;
int commit = 0;
int dataFd = -1;
void *qhandle = NULL;
int walNum = 0;
uint64_t tversion = 0;
void *syncHandle;
int role;
int nodeId;
char path[256];
int numOfWrites ;
SSyncInfo syncInfo;
SSyncCfg *pCfg;
int writeIntoWal(SWalHead *pHead)
{
if (dataFd < 0) {
char walName[280];
snprintf(walName, sizeof(walName), "%s/wal/wal.%d", path, walNum);
remove(walName);
dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd < 0) {
uInfo("failed to open wal file:%s(%s)", walName, strerror(errno));
return -1;
} else {
walNum++;
uInfo("file:%s is opened to write, walNum:%d", walName, walNum);
}
}
if (write(dataFd, pHead, sizeof(SWalHead) + pHead->len) < 0) {
uError("ver:%" PRIu64 ", failed to write wal file(%s)", pHead->version, strerror(errno));
} else {
uDebug("ver:%" PRIu64 ", written to wal", pHead->version);
}
numOfWrites++;
if (numOfWrites >= 10000) {
uInfo("%d request have been written into disk", numOfWrites);
close(dataFd);
dataFd = -1;
numOfWrites = 0;
}
return 0;
}
void confirmForward(void *ahandle, void *mhandle, int32_t code)
{
SRpcMsg *pMsg = (SRpcMsg *)mhandle;
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
uDebug("ver:%" PRIu64 ", confirm is received", pHead->version);
rpcFreeCont(pMsg->pCont);
SRpcMsg rpcMsg;
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pMsg->handle;
rpcMsg.code = code;
rpcSendResponse(&rpcMsg);
taosFreeQitem(mhandle);
}
int processRpcMsg(void *item) {
SRpcMsg *pMsg = (SRpcMsg *)item;
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
int code = -1;
if (role != TAOS_SYNC_ROLE_MASTER) {
uError("not master, write failed, role:%s", syncRole[role]);
} else {
pHead->version = ++tversion;
pHead->msgType = pMsg->msgType;
pHead->len = pMsg->contLen;
uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version);
writeIntoWal(pHead);
syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC);
code = 0;
}
if (pCfg->quorum <= 1) {
taosFreeQitem(item);
rpcFreeCont(pMsg->pCont);
SRpcMsg rpcMsg;
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pMsg->handle;
rpcMsg.code = code;
rpcSendResponse(&rpcMsg);
}
return code;
}
int processFwdMsg(void *item) {
SWalHead *pHead = (SWalHead *)item;
if (pHead->version <= tversion) {
uError("ver:%" PRIu64 ", forward is even lower than local:%" PRIu64, pHead->version, tversion);
return -1;
}
uDebug("ver:%" PRIu64 ", forward from peer is received", pHead->version);
writeIntoWal(pHead);
tversion = pHead->version;
if (pCfg->quorum > 1) syncConfirmForward(syncHandle, pHead->version, 0);
// write into cache
/*
if (pHead->handle) {
syncSendFwdAck(syncHandle, pHead->handle, 0);
}
*/
taosFreeQitem(item);
return 0;
}
int processWalMsg(void *item) {
SWalHead *pHead = (SWalHead *)item;
if (pHead->version <= tversion) {
uError("ver:%" PRIu64 ", wal is even lower than local:%" PRIu64, pHead->version, tversion);
return -1;
};
uDebug("ver:%" PRIu64 ", wal from peer is received", pHead->version);
writeIntoWal(pHead);
tversion = pHead->version;
// write into cache
/*
if (pHead->handle) {
syncSendFwdAck(syncHandle, pHead->handle, 0);
}
*/
taosFreeQitem(item);
return 0;
}
void *processWriteQueue(void *param) {
int type;
void *item;
while (1) {
int ret = taosReadQitem(qhandle, &type, &item);
if (ret <= 0) {
usleep(1000);
continue;
}
if (type == TAOS_QTYPE_RPC) {
processRpcMsg(item);
} else if (type == TAOS_QTYPE_WAL) {
processWalMsg(item);
} else if (type == TAOS_QTYPE_FWD) {
processFwdMsg(item);
}
}
return NULL;
}
int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
if (strcmp(meterId, "michael") == 0) {
*spi = 1;
*encrypt = 0;
strcpy(secret, "mypassword");
strcpy(ckey, "key");
} else if (strcmp(meterId, "jeff") == 0) {
*spi = 0;
*encrypt = 0;
} else {
ret = -1; // user not there
}
return ret;
}
void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
uDebug("request is received, type:%d, len:%d", pMsg->msgType, pMsg->contLen);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
}
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion)
{
uint32_t magic;
struct stat fstat;
char aname[280];
if (*index == 2) {
uInfo("wait for a while .....");
sleep(3);
}
if (name[0] == 0) {
// find the file
snprintf(aname, sizeof(aname), "%s/data/data.%d", path, *index);
sprintf(name, "data/data.%d", *index);
} else {
snprintf(aname, sizeof(aname), "%s/%s", path, name);
}
uInfo("get file info:%s", aname);
if ( stat(aname, &fstat) < 0 ) return 0;
*size = fstat.st_size;
magic = fstat.st_size;
return magic;
}
int getWalInfo(void *ahandle, char *name, uint32_t *index) {
struct stat fstat;
char aname[280];
name[0] = 0;
if (*index + 1> walNum) return 0;
snprintf(aname, sizeof(aname), "%s/wal/wal.%d", path, *index);
sprintf(name, "wal/wal.%d", *index);
uInfo("get wal info:%s", aname);
if ( stat(aname, &fstat) < 0 ) return -1;
if (*index >= walNum-1) return 0; // no more
return 1;
}
int writeToCache(void *ahandle, void *data, int type) {
SWalHead *pHead = data;
uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
int msgSize = pHead->len + sizeof(SWalHead);
void *pMsg = taosAllocateQitem(msgSize);
memcpy(pMsg, pHead, msgSize);
taosWriteQitem(qhandle, type, pMsg);
return 0;
}
void confirmFwd(void *ahandle, int64_t version) {
return;
}
void notifyRole(void *ahandle, int8_t r) {
role = r;
printf("current role:%s\n", syncRole[role]);
}
void initSync() {
pCfg->replica = 1;
pCfg->quorum = 1;
syncInfo.vgId = 1;
syncInfo.ahandle = &syncInfo;
syncInfo.getFileInfo = getFileInfo;
syncInfo.getWalInfo = getWalInfo;
syncInfo.writeToCache = writeToCache;
syncInfo.confirmForward = confirmForward;
syncInfo.notifyRole = notifyRole;
pCfg->nodeInfo[0].nodeId = 1;
pCfg->nodeInfo[0].nodePort = 7010;
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodeId = 2;
pCfg->nodeInfo[1].nodePort = 7110;
taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodeId = 3;
pCfg->nodeInfo[2].nodePort = 7210;
taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
pCfg->nodeInfo[3].nodeId = 4;
pCfg->nodeInfo[3].nodePort = 7310;
taosGetFqdn(pCfg->nodeInfo[3].nodeFqdn);
pCfg->nodeInfo[4].nodeId = 5;
pCfg->nodeInfo[4].nodePort = 7410;
taosGetFqdn(pCfg->nodeInfo[4].nodeFqdn);
}
void doSync()
{
for (int i=0; i<5; ++i) {
if (tsSyncPort == pCfg->nodeInfo[i].nodePort)
nodeId = pCfg->nodeInfo[i].nodeId;
}
snprintf(path, sizeof(path), "/root/test/d%d", nodeId);
strcpy(syncInfo.path, path);
if ( syncHandle == NULL) {
syncHandle = syncStart(&syncInfo);
} else {
if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL;
}
uInfo("nodeId:%d path:%s syncPort:%d", nodeId, path, tsSyncPort);
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
char dataName[20] = "server.data";
pCfg = &syncInfo.syncCfg;
initSync();
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 7000;
rpcInit.label = "SER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = tsShellActivityTimer*1500;
rpcInit.afp = retrieveAuthInfo;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
syncInfo.version = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
pCfg->replica = atoi(argv[++i]);
} else if (strcmp(argv[i], "-q")==0 && i < argc-1) {
pCfg->quorum = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
printf(" [-v version]: initial node version, default is:%ld\n", syncInfo.version);
printf(" [-r replica]: replicacation number, default is:%d\n", pCfg->replica);
printf(" [-q quorum]: quorum, default is:%d\n", pCfg->quorum);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
uDebugFlag = rpcDebugFlag;
dDebugFlag = rpcDebugFlag;
//tmrDebugFlag = rpcDebugFlag;
tsAsyncLog = 0;
taosInitLog("server.log", 1000000, 10);
rpcInit.connType = TAOS_CONN_SERVER;
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
uError("failed to start RPC server");
return -1;
}
tsSyncPort = rpcInit.localPort + 10;
qhandle = taosOpenQueue();
doSync();
pthread_attr_t thattr;
pthread_t thread;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&thread, &thattr, processWriteQueue, NULL) != 0) {
uError("failed to create thread, reason:%s", strerror(errno));
return -1;
}
printf("server is running, localPort:%d\n", rpcInit.localPort);
SNodesRole nroles;
while (1) {
char c = getchar();
switch(c) {
case '1':
pCfg->replica = 1; doSync();
break;
case '2':
pCfg->replica = 2; doSync();
break;
case '3':
pCfg->replica = 3; doSync();
break;
case '4':
pCfg->replica = 4; doSync();
break;
case '5':
pCfg->replica = 5; doSync();
break;
case 's':
syncGetNodesRole(syncHandle, &nroles);
for (int i=0; i<pCfg->replica; ++i)
printf("=== nodeId:%d role:%s\n", nroles.nodeId[i], syncRole[nroles.role[i]]);
break;
default:
break;
}
if (c=='q') break;
}
syncStop(syncHandle);
if (dataFd >= 0) {
close(dataFd);
remove(dataName);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册