From eb8c0bd72ddcf0c296f2bdfd64a4b1546105a737 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 28 Jul 2020 02:33:40 +0000 Subject: [PATCH] publish sync module --- src/sync/CMakeLists.txt | 23 + src/sync/inc/syncInt.h | 176 +++++ src/sync/inc/syncMain.h | 33 + src/sync/inc/taosTcpPool.h | 47 ++ src/sync/src/syncMain.c | 1207 ++++++++++++++++++++++++++++++++++ src/sync/src/syncRestore.c | 326 +++++++++ src/sync/src/syncRetrieve.c | 479 ++++++++++++++ src/sync/src/taosTcpPool.c | 325 +++++++++ src/sync/src/tarbitrator.c | 191 ++++++ src/sync/test/CMakeLists.txt | 20 + src/sync/test/syncClient.c | 194 ++++++ src/sync/test/syncServer.c | 487 ++++++++++++++ 12 files changed, 3508 insertions(+) create mode 100644 src/sync/CMakeLists.txt create mode 100644 src/sync/inc/syncInt.h create mode 100644 src/sync/inc/syncMain.h create mode 100644 src/sync/inc/taosTcpPool.h create mode 100644 src/sync/src/syncMain.c create mode 100644 src/sync/src/syncRestore.c create mode 100644 src/sync/src/syncRetrieve.c create mode 100644 src/sync/src/taosTcpPool.c create mode 100644 src/sync/src/tarbitrator.c create mode 100644 src/sync/test/CMakeLists.txt create mode 100644 src/sync/test/syncClient.c create mode 100644 src/sync/test/syncServer.c diff --git a/src/sync/CMakeLists.txt b/src/sync/CMakeLists.txt new file mode 100644 index 0000000000..0a5d18022b --- /dev/null +++ b/src/sync/CMakeLists.txt @@ -0,0 +1,23 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) + INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(src SRC) + LIST(REMOVE_ITEM SRC ./src/tarbitrator.c) + + ADD_LIBRARY(sync ${SRC}) + TARGET_LINK_LIBRARIES(sync tutil pthread common) + + LIST(APPEND BIN_SRC ./src/tarbitrator.c) + LIST(APPEND BIN_SRC ./src/taosTcpPool.c) + ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) + TARGET_LINK_LIBRARIES(tarbitrator tutil sync common) + + ADD_SUBDIRECTORY(test) +ENDIF () diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h new file mode 100644 index 0000000000..d6d86064d6 --- /dev/null +++ b/src/sync/inc/syncInt.h @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_SYNCINT_H +#define TDENGINE_SYNCINT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }} +#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }} +#define sWarn(...) { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); }} +#define sInfo(...) { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} +#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} +#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} + +#define TAOS_SMSG_SYNC_DATA 1 +#define TAOS_SMSG_FORWARD 2 +#define TAOS_SMSG_FORWARD_RSP 3 +#define TAOS_SMSG_SYNC_REQ 4 +#define TAOS_SMSG_SYNC_RSP 5 +#define TAOS_SMSG_SYNC_MUST 6 +#define TAOS_SMSG_STATUS 7 + +#define nodeRole pNode->peerInfo[pNode->selfIndex]->role +#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version +#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus + +#pragma pack(push, 1) + +typedef struct { + char type; // msg type + char pversion; // protocol version + char reserved[6]; // not used + int32_t vgId; // vg ID + int32_t len; // content length, does not include head + // char cont[]; // message content starts from here +} SSyncHead; + +typedef struct { + SSyncHead syncHead; + uint16_t port; + char fqdn[TSDB_FQDN_LEN]; + int32_t sourceId; // only for arbitrator +} SFirstPkt; + +typedef struct { + int8_t role; + uint64_t version; +} SPeerStatus; + +typedef struct { + int8_t role; + int8_t ack; + uint64_t version; + SPeerStatus peersStatus[]; +} SPeersStatus; + +typedef struct { + char name[TSDB_FILENAME_LEN]; + uint32_t magic; + uint32_t index; + uint64_t fversion; + int32_t size; +} SFileInfo; + +typedef struct { + int8_t sync; +} SFileAck; + +typedef struct { + uint64_t version; + int32_t code; +} SFwdRsp; + +#pragma pack(pop) + +typedef struct { + char *buffer; + int bufferSize; + char *offset; + int forwards; + int code; +} SRecvBuffer; + +typedef struct { + uint64_t version; + void *mhandle; + int8_t acks; + int8_t nacks; + int8_t confirmed; + int32_t code; + uint64_t time; +} SFwdInfo; + +typedef struct { + int first; + int last; + int fwds; // number of forwards + SFwdInfo fwdInfo[]; +} SSyncFwds; + +typedef struct SsyncPeer { + int32_t nodeId; + uint32_t ip; + uint16_t port; + char fqdn[TSDB_FQDN_LEN]; // peer ip string + char id[TSDB_EP_LEN+16]; // peer vgId + end point + int8_t role; + int8_t sstatus; // sync status + uint64_t version; + uint64_t sversion; // track the peer version in retrieve process + int syncFd; + int peerFd; // forward FD + void *timer; + void *pConn; + int notifyFd; + int watchNum; + int *watchFd; + int8_t refCount; // reference count + struct SSyncNode *pSyncNode; +} SSyncPeer; + +typedef struct SSyncNode { + char path[TSDB_FILENAME_LEN]; + int8_t replica; + int8_t quorum; + uint32_t vgId; + void *ahandle; + int8_t selfIndex; + SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator + SSyncPeer *pMaster; + int8_t refCount; + SRecvBuffer *pRecv; + SSyncFwds *pSyncFwds; // saved forward info if quorum >1 + void *pFwdTimer; + FGetFileInfo getFileInfo; + FGetWalInfo getWalInfo; + FWriteToCache writeToCache; + FConfirmForward confirmForward; + FNotifyRole notifyRole; + FNotifyFileSynced notifyFileSynced; + pthread_mutex_t mutex; +} SSyncNode; + +// sync module global +extern int tsSyncNum; +extern char tsNodeFqdn[TSDB_FQDN_LEN]; + +void *syncRetrieveData(void *param); +void *syncRestoreData(void *param); +int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); +void syncRestartConnection(SSyncPeer *pPeer); +void syncBroadcastStatus(SSyncNode *pNode); +void syncAddPeerRef(SSyncPeer *pPeer); +int syncDecPeerRef(SSyncPeer *pPeer); + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_VNODEPEER_H diff --git a/src/sync/inc/syncMain.h b/src/sync/inc/syncMain.h new file mode 100644 index 0000000000..d4ddb12733 --- /dev/null +++ b/src/sync/inc/syncMain.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_PLUGINS_SYNC_H +#define TDENGINE_PLUGINS_SYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +int32_t syncTest1(); +int32_t syncTest2(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/sync/inc/taosTcpPool.h b/src/sync/inc/taosTcpPool.h new file mode 100644 index 0000000000..1e410acc26 --- /dev/null +++ b/src/sync/inc/taosTcpPool.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TCP_POOL_H +#define TDENGINE_TCP_POOL_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* ttpool_h; +typedef void* tthread_h; + +typedef struct { + int numOfThreads; + uint32_t serverIp; + short port; + int bufferSize; + void (*processBrokenLink)(void *ahandle); + int (*processIncomingMsg)(void *ahandle, void *buffer); + void (*processIncomingConn)(int fd, uint32_t ip); +} SPoolInfo; + +ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo); +void taosCloseTcpThreadPool(ttpool_h); +void *taosAllocateTcpConn(void *, void *ahandle, int connFd); +void taosFreeTcpConn(void *); + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TCP_POOL_H + diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c new file mode 100644 index 0000000000..93c4a9402f --- /dev/null +++ b/src/sync/src/syncMain.c @@ -0,0 +1,1207 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#include +//#include +#include "os.h" +#include "hash.h" +#include "tlog.h" +#include "tutil.h" +#include "ttimer.h" +#include "ttime.h" +#include "tsocket.h" +#include "tglobal.h" +#include "taoserror.h" +#include "taosTcpPool.h" +#include "tqueue.h" +#include "twal.h" +#include "tsync.h" +#include "syncInt.h" + +// global configurable +int tsMaxSyncNum = 4; +int tsSyncTcpThreads = 2; +int tsMaxWatchFiles = 100; +int tsMaxFwdInfo = 200; +int tsSyncTimer = 1; +//int sDebugFlag = 135; +//char tsArbitrator[TSDB_FQDN_LEN] = {0}; + +// module global, not configurable +int tsSyncNum; // number of sync in process in whole system +char tsNodeFqdn[TSDB_FQDN_LEN]; + +static int tsNodeNum; // number of nodes in system +static ttpool_h tsTcpPool; +static void *syncTmrCtrl = NULL; +static void *vgIdHash; +static pthread_once_t syncModuleInit = PTHREAD_ONCE_INIT; + +// local functions +static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); +static void syncRecoverFromMaster(SSyncPeer *pPeer); +static void syncCheckPeerConnection(void *param, void *tmrId); +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack); +static void syncProcessBrokenLink(void *param); +static int syncProcessPeerMsg(void *param, void *buffer); +static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); +static void syncRemovePeer(SSyncPeer *pPeer); +static void syncAddArbitrator(SSyncNode *pNode); +static void syncAddNodeRef(SSyncNode *pNode); +static void syncDecNodeRef(SSyncNode *pNode); +static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); +static void syncMonitorFwdInfos(void *param, void *tmrId); +static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); +static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); +static void syncRestartPeer(SSyncPeer *pPeer); +static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); + +char* syncRole[] = { + "offline", + "unsynced", + "slave", + "master" +}; + +static void syncModuleInitFunc() { + SPoolInfo info; + + info.numOfThreads = tsSyncTcpThreads; + info.serverIp = 0; + info.port = tsSyncPort; + info.bufferSize = 640000; + info.processBrokenLink = syncProcessBrokenLink; + info.processIncomingMsg = syncProcessPeerMsg; + info.processIncomingConn = syncProcessIncommingConnection; + + tsTcpPool = taosOpenTcpThreadPool(&info); + if (tsTcpPool == NULL) return; + + syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); + if (syncTmrCtrl == NULL) { + taosCloseTcpThreadPool(tsTcpPool); + tsTcpPool = NULL; + return; + } + + vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true); + if (vgIdHash == NULL) { + taosTmrCleanUp(syncTmrCtrl); + taosCloseTcpThreadPool(tsTcpPool); + tsTcpPool = NULL; + syncTmrCtrl = NULL; + return; + } + + tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); +} + +void *syncStart(const SSyncInfo *pInfo) +{ + const SSyncCfg *pCfg = &pInfo->syncCfg; + + SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); + if (pNode == NULL) { + sError("no memory to allocate syncNode"); + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + pthread_once(&syncModuleInit, syncModuleInitFunc); + if (tsTcpPool == NULL) { + free(pNode); + syncModuleInit = PTHREAD_ONCE_INIT; + sError("failed to init sync module(%s)", tstrerror(errno)); + return NULL; + } + + atomic_add_fetch_32(&tsNodeNum, 1); + tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); + pthread_mutex_init(&pNode->mutex, NULL); + + pNode->ahandle = pInfo->ahandle; + pNode->getFileInfo = pInfo->getFileInfo; + pNode->getWalInfo = pInfo->getWalInfo; + pNode->writeToCache = pInfo->writeToCache; + pNode->notifyRole = pInfo->notifyRole; + pNode->confirmForward = pInfo->confirmForward; + pNode->notifyFileSynced = pInfo->notifyFileSynced; + + pNode->selfIndex = -1; + pNode->vgId = pInfo->vgId; + pNode->replica = pCfg->replica; + pNode->quorum = pCfg->quorum; + for (int i = 0; i < pCfg->replica; ++i) { + const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; + pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); + if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) + pNode->selfIndex = i; + } + + if (pNode->selfIndex < 0) { + sInfo("vgId:%d, this node is not configured", pNode->vgId); + terrno = TSDB_CODE_SYN_INVALID_CONFIG; + syncStop(pNode); + return NULL; + } + + nodeVersion = pInfo->version; // set the initial version + nodeRole = (pNode->replica > 1) ? TAOS_SYNC_ROLE_UNSYNCED : TAOS_SYNC_ROLE_MASTER; + sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); + + pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo*sizeof(SFwdInfo), 1); + if (pNode->pSyncFwds == NULL) { + sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); + terrno = TAOS_SYSTEM_ERROR(errno); + syncStop(pNode); + return NULL; + } + + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + if (pNode->pFwdTimer == NULL) { + sError("vgId:%d, failed to allocate timer", pNode->vgId); + syncStop(pNode); + return NULL; + } + + syncAddArbitrator(pNode); + syncAddNodeRef(pNode); + taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); + + if (pNode->notifyRole) + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + + return pNode; +} + +void syncStop(void *param) +{ + SSyncNode *pNode = param; + SSyncPeer *pPeer; + + if (pNode == NULL) return; + sInfo("vgId:%d, cleanup sync", pNode->vgId); + + pthread_mutex_lock(&(pNode->mutex)); + + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer) syncRemovePeer(pPeer); + } + + pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; + if (pPeer) syncRemovePeer(pPeer); + + if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); + if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); + + pthread_mutex_unlock(&(pNode->mutex)); + + syncDecNodeRef(pNode); +} + +int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) +{ + SSyncNode *pNode = param; + int i, j; + + if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; + sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], + pNewCfg->replica, pNode->replica); + + pthread_mutex_lock(&(pNode->mutex)); + + for (i = 0; i < pNode->replica; ++i) { + for (j = 0; j < pNewCfg->replica; ++j) { + if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) && + (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort)) + break; + } + + if (j >= pNewCfg->replica) { + syncRemovePeer(pNode->peerInfo[i]); + pNode->peerInfo[i] = NULL; + } + } + + SSyncPeer *newPeers[TAOS_SYNC_MAX_REPLICA]; + for (i = 0; i < pNewCfg->replica; ++i) { + const SNodeInfo *pNewNode = &pNewCfg->nodeInfo[i]; + + for (j = 0; j < pNode->replica; ++j) { + if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) && + (pNode->peerInfo[j]->port == pNewNode->nodePort)) + break; + } + + if (j >= pNode->replica) { + newPeers[i] = syncAddPeer(pNode, pNewNode); + } else { + newPeers[i] = pNode->peerInfo[j]; + } + + if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) + pNode->selfIndex = i; + } + + pNode->replica = pNewCfg->replica; + pNode->quorum = pNewCfg->quorum; + memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica); + + for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) + pNode->peerInfo[i] = NULL; + + syncAddArbitrator(pNode); + + if (pNewCfg->replica <= 1) { + sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId); + nodeRole = TAOS_SYNC_ROLE_MASTER; + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + } + + pthread_mutex_unlock(&(pNode->mutex)); + + sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); + syncBroadcastStatus(pNode); + + return 0; +} + +int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) +{ + SSyncNode *pNode = param; + SSyncPeer *pPeer; + SSyncHead *pSyncHead; + SWalHead *pWalHead = data; + int fwdLen; + int code = 0; + + if (pNode == NULL) return 0; + + // always update version + nodeVersion = pWalHead->version; + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER ) return 0; + + // only pkt from RPC or CQ can be forwarded + if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; + + // a hacker way to improve the performance + pSyncHead = (SSyncHead *) ( ((char *)pWalHead) - sizeof(SSyncHead)); + pSyncHead->type = TAOS_SMSG_FORWARD; + pSyncHead->pversion = 0; + pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + fwdLen = pSyncHead->len + sizeof(SSyncHead); //include the WAL and SYNC head + + pthread_mutex_lock(&(pNode->mutex)); + + if (pNode->quorum > 1) { + syncSaveFwdInfo(pNode, pWalHead->version, mhandle); + code = 1; + } + + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer == NULL || pPeer->peerFd <0) continue; + if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; + + int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); + if (retLen == fwdLen) { + sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + } else { + sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); + syncRestartConnection(pPeer); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); + + return code; +} + +void syncConfirmForward(void *param, uint64_t version, int32_t code) +{ + SSyncNode *pNode = param; + if (pNode == NULL) return; + if (pNode->quorum <= 1) return; + + SSyncPeer *pPeer = pNode->pMaster; + if (pPeer == NULL) return; + + char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; + + SSyncHead *pHead = (SSyncHead *) msg; + pHead->type = TAOS_SMSG_FORWARD_RSP; + pHead->len = sizeof(SFwdRsp); + + SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); + pFwdRsp->version = version; + pFwdRsp->code = code; + + int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); + int retLen = write(pPeer->peerFd, msg, msgLen); + + if (retLen == msgLen) { + sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); + } else { + sDebug("%s, failed to send forward ack, restart", pPeer->id); + syncRestartConnection(pPeer); + } +} + +void syncRecover(void *param) { + SSyncNode *pNode = param; + SSyncPeer *pPeer; + + // to do: add a few lines to check if recover is OK + // if take this node to unsync state, the whole system may not work + + nodeRole = TAOS_SYNC_ROLE_UNSYNCED; + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + nodeVersion = 0; + + pthread_mutex_lock(&(pNode->mutex)); + + for (int i = 0; i < pNode->replica; ++i) { + pPeer = (SSyncPeer *) pNode->peerInfo[i]; + if (pPeer->peerFd >= 0) { + syncRestartConnection(pPeer); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); +} + +int syncGetNodesRole(void *param, SNodesRole *pNodesRole) +{ + SSyncNode *pNode = param; + + pNodesRole->selfIndex = pNode->selfIndex; + for (int i=0; ireplica; ++i) { + pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; + pNodesRole->role[i] = pNode->peerInfo[i]->role; + } + + return 0; +} + +static void syncAddArbitrator(SSyncNode *pNode) +{ + SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; + + // if not configured, return right away + if (tsArbitrator[0] == 0) { + if (pPeer) syncRemovePeer(pPeer); + pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL; + return; + } + + SNodeInfo nodeInfo; + nodeInfo.nodeId = 0; + taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort); + nodeInfo.nodePort += TSDB_PORT_SYNC; + + if (pPeer) { + if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) { + return; + } else { + syncRemovePeer(pPeer); + pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL; + } + } + + pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); +} + +static void syncAddNodeRef(SSyncNode *pNode) +{ + atomic_add_fetch_8(&pNode->refCount, 1); +} + +static void syncDecNodeRef(SSyncNode *pNode) +{ + if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { + pthread_mutex_destroy(&pNode->mutex); + tfree(pNode->pRecv); + tfree(pNode->pSyncFwds); + tfree(pNode); + + if (atomic_sub_fetch_32(&tsNodeNum, 1) == 0) { + if (tsTcpPool) taosCloseTcpThreadPool(tsTcpPool); + if (syncTmrCtrl) taosTmrCleanUp(syncTmrCtrl); + if (vgIdHash) taosHashCleanup(vgIdHash); + syncTmrCtrl = NULL; + tsTcpPool = NULL; + vgIdHash = NULL; + syncModuleInit = PTHREAD_ONCE_INIT; + sDebug("sync module is cleaned up"); + } + } +} + +void syncAddPeerRef(SSyncPeer *pPeer) +{ + atomic_add_fetch_8(&pPeer->refCount, 1); +} + +int syncDecPeerRef(SSyncPeer *pPeer) +{ + if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { + syncDecNodeRef(pPeer->pSyncNode); + + sDebug("%s, resource is freed", pPeer->id); + tfree(pPeer->watchFd); + tfree(pPeer); + return 0; + } + + return 1; +} + +static void syncClosePeerConn(SSyncPeer *pPeer) +{ + taosTmrStopA(&pPeer->timer); + tclose(pPeer->syncFd); + if (pPeer->peerFd >=0) { + pPeer->peerFd = -1; + taosFreeTcpConn(pPeer->pConn); + } +} + +static void syncRemovePeer(SSyncPeer *pPeer) +{ + sInfo("%s, it is removed", pPeer->id); + + pPeer->ip = 0; + syncClosePeerConn(pPeer); + syncDecPeerRef(pPeer); +} + +static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) +{ + uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); + if (ip == -1) return NULL; + + SSyncPeer *pPeer = (SSyncPeer *) calloc(1, sizeof(SSyncPeer)); + if (pPeer == NULL) return NULL; + + pPeer->nodeId = pInfo->nodeId; + tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn)); + pPeer->ip = ip; + pPeer->port = pInfo->nodePort; + snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port); + + pPeer->peerFd = -1; + pPeer->syncFd = -1; + pPeer->role = TAOS_SYNC_ROLE_OFFLINE; + pPeer->pSyncNode = pNode; + pPeer->refCount = 1; + + sInfo("%s, it is configured", pPeer->id); + int ret = strcmp(pPeer->fqdn, tsNodeFqdn); + if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { + sDebug("%s, start to check peer connection", pPeer->id); + taosTmrReset(syncCheckPeerConnection, 100, pPeer, syncTmrCtrl, &pPeer->timer); + } + + syncAddNodeRef(pNode); + return pPeer; +} + +void syncBroadcastStatus(SSyncNode *pNode) +{ + SSyncPeer *pPeer; + + for (int i = 0; i < pNode->replica; ++i) { + if ( i == pNode->selfIndex ) continue; + pPeer = pNode->peerInfo[i]; + syncSendPeersStatusMsgToPeer(pPeer, 1); + } +} + +static void syncChooseMaster(SSyncNode *pNode) { + SSyncPeer *pPeer; + int onlineNum = 0; + int index = -1; + int replica = pNode->replica; + + sDebug("vgId:%d, choose master", pNode->vgId); + + for (int i = 0; i < pNode->replica; ++i) { + if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) + onlineNum++; + } + + if (onlineNum == pNode->replica) { + // if all peers are online, peer with highest version shall be master + index = 0; + for (int i = 1; i < pNode->replica; ++i) { + if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) + index = i; + } + } + + // add arbitrator connection + SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; + if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) { + onlineNum++; + replica = pNode->replica + 1; + } + + if (index < 0 && onlineNum > replica/2.0) { + // over half of nodes are online + for (int i = 0; i < pNode->replica; ++i) { + //slave with highest version shall be master + pPeer = pNode->peerInfo[i]; + if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { + if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) + index = i; + } + } + } + + if (index >= 0) { + if (index == pNode->selfIndex) { + sInfo("vgId:%d, start to work as master", pNode->vgId); + nodeRole = TAOS_SYNC_ROLE_MASTER; + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + } else { + pPeer = pNode->peerInfo[index]; + sInfo("%s, it shall work as master", pPeer->id); + } + } else { + sDebug("vgId:%d, failed to choose master", pNode->vgId); + } +} + +static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { + int onlineNum = 0; + int index = -1; + int replica = pNode->replica; + + for (int i = 0; i < pNode->replica; ++i) { + if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) + onlineNum++; + } + + // add arbitrator connection + SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; + if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) { + onlineNum++; + replica = pNode->replica + 1; + } + + if (onlineNum <= replica*0.5) { + if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { + nodeRole = TAOS_SYNC_ROLE_UNSYNCED; + pNode->peerInfo[pNode->selfIndex]->role = nodeRole; + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); + } + } else { + for (int i=0; ireplica; ++i) { + SSyncPeer *pTemp = pNode->peerInfo[i]; + if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue; + if ( index < 0 ) { + index = i; + } else { // multiple masters, it shall not happen + if ( i == pNode->selfIndex ) { + sError("%s, peer is master, work as slave instead", pTemp->id); + nodeRole = TAOS_SYNC_ROLE_SLAVE; + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + } + } + } + } + + SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL; + return pMaster; +} + +static int syncValidateMaster(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; + int code = 0; + + if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { + sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); + nodeRole = TAOS_SYNC_ROLE_UNSYNCED; + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + code = -1; + + for (int i = 0; i < pNode->replica; ++i) { + if ( i == pNode->selfIndex ) continue; + syncRestartPeer(pNode->peerInfo[i]); + } + } + + return code; +} + +static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) +{ + SSyncNode *pNode = pPeer->pSyncNode; + int8_t peerOldRole = pPeer->role; + int8_t selfOldRole = nodeRole; + int8_t i, syncRequired = 0; + + pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; + pPeer->role = newRole; + + sDebug("%s, own role:%s, new peer role:%s", pPeer->id, + syncRole[nodeRole], syncRole[pPeer->role]); + + SSyncPeer *pMaster = syncCheckMaster(pNode); + + if ( pMaster ) { + // master is there + pNode->pMaster = pMaster; + sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version); + + if (syncValidateMaster(pPeer) < 0) return; + + if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { + if ( nodeVersion < pMaster->version) { + syncRequired = 1; + } else { + sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); + nodeRole = TAOS_SYNC_ROLE_SLAVE; + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + } + } else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { + // nodeVersion = pMaster->version; + } + } else { + // master not there, if all peer's state and version are consistent, choose the master + int consistent = 0; + if (peersStatus) { + for (i = 0; i < pNode->replica; ++i) { + SSyncPeer *pTemp = pNode->peerInfo[i]; + if (pTemp->role != peersStatus[i].role) break; + if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break; + } + + if (i >= pNode->replica) consistent = 1; + } else { + if (pNode->replica == 2) consistent = 1; + } + + if (consistent) + syncChooseMaster(pNode); + } + + if (syncRequired) { + syncRecoverFromMaster(pMaster); + } + + if (peerOldRole != newRole || nodeRole != selfOldRole) + syncBroadcastStatus(pNode); +} + +static void syncRestartPeer(SSyncPeer *pPeer) { + sDebug("%s, restart connection", pPeer->id); + + syncClosePeerConn(pPeer); + + pPeer->sstatus = TAOS_SYNC_STATUS_INIT; + + int ret = strcmp(pPeer->fqdn, tsNodeFqdn); + if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) ) + taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); +} + +void syncRestartConnection(SSyncPeer *pPeer) +{ + if (pPeer->ip == 0) return; + + syncRestartPeer(pPeer); + syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE); +} + +static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + sDebug("%s, sync-req is received", pPeer->id); + + if (pPeer->ip == 0) return; + + if (nodeRole != TAOS_SYNC_ROLE_MASTER) { + sError("%s, I am not master anymore", pPeer->id); + tclose(pPeer->syncFd); + return; + } + + if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) { + sDebug("%s, sync is already started", pPeer->id); + return; // already started + } + + // start a new thread to retrieve the data + syncAddPeerRef(pPeer); + pthread_attr_t thattr; + pthread_t thread; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); + int ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer); + pthread_attr_destroy(&thattr); + + if (ret != 0) { + sError("%s, failed to create sync thread(%s)", pPeer->id, strerror(errno)); + syncDecPeerRef(pPeer); + } else { + pPeer->sstatus = TAOS_SYNC_STATUS_START; + sDebug("%s, thread is created to retrieve data", pPeer->id); + } +} + +static void syncNotStarted(void *param, void *tmrId) +{ + SSyncPeer *pPeer = param; + SSyncNode *pNode = pPeer->pSyncNode; + + pthread_mutex_lock(&(pNode->mutex)); + pPeer->timer = NULL; + sInfo("%s, sync connection is still not up, restart", pPeer->id); + syncRestartConnection(pPeer); + pthread_mutex_unlock(&(pNode->mutex)); +} + +static void syncTryRecoverFromMaster(void *param, void *tmrId) { + SSyncPeer *pPeer = param; + SSyncNode *pNode = pPeer->pSyncNode; + + pthread_mutex_lock(&(pNode->mutex)); + syncRecoverFromMaster(pPeer); + pthread_mutex_unlock(&(pNode->mutex)); +} + +static void syncRecoverFromMaster(SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + + if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) { + sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); + return; + } + + taosTmrStopA(&pPeer->timer); + if (tsSyncNum >= tsMaxSyncNum) { + sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); + taosTmrReset(syncTryRecoverFromMaster, 500, pPeer, syncTmrCtrl, &pPeer->timer); + return; + } + + sDebug("%s, try to sync", pPeer->id) + + SFirstPkt firstPkt; + memset(&firstPkt, 0, sizeof(firstPkt)); + firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ; + firstPkt.syncHead.vgId = pNode->vgId; + firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); + tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); + firstPkt.port = tsSyncPort; + taosTmrReset(syncNotStarted, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); + + if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt) ) { + sError("%s, failed to send sync-req to peer", pPeer->id); + } else { + nodeSStatus = TAOS_SYNC_STATUS_START; + sInfo("%s, sync-req is sent", pPeer->id); + } + + return; +} + +static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SFwdRsp *pFwdRsp = (SFwdRsp *) cont; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; + SFwdInfo *pFwdInfo; + + sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version); + SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; + + if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { + // find the forwardInfo from first + for (int i=0; ifwds; ++i) { + pFwdInfo = pSyncFwds->fwdInfo + (i+pSyncFwds->first)%tsMaxFwdInfo; + if (pFwdRsp->version == pFwdInfo->version) break; + } + + syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); + syncRemoveConfirmedFwdInfo(pNode); + } +} + + +static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SWalHead *pHead = (SWalHead *)cont; + + sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); + + if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { + //nodeVersion = pHead->version; + (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); + } else { + if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { + syncSaveIntoBuffer(pPeer, pHead); + } else { + sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version); + } + } + + return; +} + +static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SPeersStatus *pPeersStatus = (SPeersStatus *)cont; + + sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, + syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); + + pPeer->version = pPeersStatus->version; + syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); + + if (pPeersStatus->ack) + syncSendPeersStatusMsgToPeer(pPeer, 0); +} + +static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { + if (pPeer->peerFd <0) return -1; + + int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); + if (hlen != sizeof(SSyncHead)) { + sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen); + return -1; + } + + // head.len = htonl(head.len); + if (pHead->len <0) { + sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); + return -1; + } + + int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); + if (bytes != pHead->len) { + sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); + return -1; + } + + return 0; +} + +static int syncProcessPeerMsg(void *param, void *buffer) +{ + SSyncPeer *pPeer = param; + SSyncHead head; + char *cont = (char *)buffer; + + SSyncNode *pNode = pPeer->pSyncNode; + pthread_mutex_lock(&(pNode->mutex)); + + int code = syncReadPeerMsg(pPeer, &head, cont); + + if (code == 0) { + if (head.type == TAOS_SMSG_FORWARD) { + syncProcessForwardFromPeer(cont, pPeer); + } else if (head.type == TAOS_SMSG_FORWARD_RSP) { + syncProcessFwdResponse(cont, pPeer); + } else if (head.type == TAOS_SMSG_SYNC_REQ) { + syncProcessSyncRequest(cont, pPeer); + } else if (head.type == TAOS_SMSG_STATUS) { + syncProcessPeersStatusMsg(cont, pPeer); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); + + return code; +} + +#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA + +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) +{ + SSyncNode *pNode = pPeer->pSyncNode; + char msg[statusMsgLen] = {0}; + + if (pPeer->peerFd <0 || pPeer->ip ==0) return; + + SSyncHead *pHead = (SSyncHead *) msg; + SPeersStatus *pPeersStatus = (SPeersStatus *) (msg + sizeof(SSyncHead)); + + pHead->type = TAOS_SMSG_STATUS; + pHead->len = statusMsgLen - sizeof(SSyncHead); + + pPeersStatus->version = nodeVersion; + pPeersStatus->role = nodeRole; + pPeersStatus->ack = ack; + + for (int i = 0; i < pNode->replica; ++i) { + pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; + pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; + } + + int retLen = write(pPeer->peerFd, msg, statusMsgLen); + if (retLen == statusMsgLen) { + sDebug("%s, status msg is sent", pPeer->id); + } else { + sDebug("%s, failed to send status msg, restart", pPeer->id); + syncRestartConnection(pPeer); + } + + return; +} + +static void syncSetupPeerConnection(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; + + taosTmrStopA(&pPeer->timer); + if (pPeer->peerFd >= 0) { + sDebug("%s, send role version to peer", pPeer->id); + syncSendPeersStatusMsgToPeer(pPeer, 1); + return; + } + + int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); + if (connFd < 0) { + sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno)); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); + return; + } + + SFirstPkt firstPkt; + memset(&firstPkt, 0, sizeof(firstPkt)); + firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId:0; + firstPkt.syncHead.type = TAOS_SMSG_STATUS; + tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); + firstPkt.port = tsSyncPort; + firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId + + if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { + sDebug("%s, connection to peer server is setup", pPeer->id); + pPeer->peerFd = connFd; + pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; + pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); + syncAddPeerRef(pPeer); + } else { + sDebug("try later"); + close(connFd); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); + } +} + +static void syncCheckPeerConnection(void *param, void *tmrId) +{ + SSyncPeer *pPeer = param; + SSyncNode *pNode = pPeer->pSyncNode; + + pthread_mutex_lock(&(pNode->mutex)); + + sDebug("%s, check peer connection", pPeer->id); + syncSetupPeerConnection(pPeer); + + pthread_mutex_unlock(&(pNode->mutex)); +} + +static void syncCreateRestoreDataThread(SSyncPeer *pPeer) +{ + taosTmrStopA(&pPeer->timer); + + pthread_attr_t thattr; + pthread_t thread; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); + + syncAddPeerRef(pPeer); + int ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer); + pthread_attr_destroy(&thattr); + + if (ret < 0) { + sError("%s, failed to create sync thread", pPeer->id); + tclose(pPeer->syncFd); + syncDecPeerRef(pPeer); + } else { + sInfo("%s, sync connection is up", pPeer->id); + } +} + +static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) +{ + char ipstr[24]; + int i; + + tinet_ntoa(ipstr, sourceIp); + sDebug("peer TCP connection from ip:%s", ipstr); + + SFirstPkt firstPkt; + if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { + sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno)); + taosCloseSocket(connFd); + return; + } + + int32_t vgId = firstPkt.syncHead.vgId; + SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t)); + if (ppNode == NULL || *ppNode == NULL) { + sError("vgId:%d, vgId could not be found", vgId); + taosCloseSocket(connFd); + return; + } + + SSyncNode *pNode = *ppNode; + pthread_mutex_lock(&(pNode->mutex)); + + SSyncPeer *pPeer; + for (i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) + break; + } + + pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; + if (pPeer == NULL) { + sError("vgId:%d, peer:%s not configured", pNode->vgId, firstPkt.fqdn); + taosCloseSocket(connFd); + // syncSendVpeerCfgMsg(sync); + } else { + // first packet tells what kind of link + if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) { + pPeer->syncFd = connFd; + syncCreateRestoreDataThread(pPeer); + } else { + sDebug("%s, TCP connection is already up, close one", pPeer->id); + syncClosePeerConn(pPeer); + pPeer->peerFd = connFd; + pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); + syncAddPeerRef(pPeer); + sDebug("%s, ready to exchange data", pPeer->id); + syncSendPeersStatusMsgToPeer(pPeer, 1); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); + + return; +} + +static void syncProcessBrokenLink(void *param) { + if (param == NULL) return; // the connection for arbitrator + SSyncPeer *pPeer = param; + SSyncNode *pNode = pPeer->pSyncNode; + + syncAddNodeRef(pNode); + pthread_mutex_lock(&(pNode->mutex)); + + sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); + pPeer->peerFd = -1; + + if (syncDecPeerRef(pPeer) != 0) { + syncRestartConnection(pPeer); + } + + pthread_mutex_unlock(&(pNode->mutex)); + syncDecNodeRef(pNode); +} + +static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) +{ + SSyncFwds *pSyncFwds = pNode->pSyncFwds; + uint64_t time = taosGetTimestampMs(); + + if (pSyncFwds->fwds >= tsMaxFwdInfo) { + pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; + pSyncFwds->fwds--; + } + + if (pSyncFwds->fwds > 0) + pSyncFwds->last = (pSyncFwds->last+1) % tsMaxFwdInfo; + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; + pFwdInfo->version = version; + pFwdInfo->mhandle = mhandle; + pFwdInfo->acks = 0; + pFwdInfo->confirmed = 0; + pFwdInfo->time = time; + + pSyncFwds->fwds++; + sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); +} + +static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) +{ + SSyncFwds *pSyncFwds = pNode->pSyncFwds; + + int fwds = pSyncFwds->fwds; + for (int i=0; ifwdInfo + pSyncFwds->first; + if (pFwdInfo->confirmed == 0) break; + + pSyncFwds->first = (pSyncFwds->first+1) % tsMaxFwdInfo; + pSyncFwds->fwds--; + if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; + //sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d", + // pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); + memset(pFwdInfo, 0, sizeof(SFwdInfo)); + } +} + +static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) +{ + int confirm = 0; + if (pFwdInfo->code == 0) pFwdInfo->code = code; + + if (code == 0) { + pFwdInfo->acks++; + if (pFwdInfo->acks >= pNode->quorum-1) + confirm = 1; + } else { + pFwdInfo->nacks++; + if (pFwdInfo->nacks > pNode->replica-pNode->quorum) + confirm = 1; + } + + if (confirm && pFwdInfo->confirmed == 0) { + sDebug("vgId:%d, forward is confirmed, ver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); + (*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code); + pFwdInfo->confirmed = 1; + } +} + +static void syncMonitorFwdInfos(void *param, void *tmrId) +{ + SSyncNode *pNode = param; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; + uint64_t time = taosGetTimestampMs(); + + if (pSyncFwds->fwds > 0) { + pthread_mutex_lock(&(pNode->mutex)); + for (int i=0; ifwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first+i) % tsMaxFwdInfo; + if (time - pFwdInfo->time < 2000) break; + syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + syncRemoveConfirmedFwdInfo(pNode); + pthread_mutex_unlock(&(pNode->mutex)); + } + + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); +} + + + diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c new file mode 100644 index 0000000000..34afbc4db6 --- /dev/null +++ b/src/sync/src/syncRestore.c @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tlog.h" +#include "tutil.h" +#include "ttimer.h" +#include "tsocket.h" +#include "tqueue.h" +#include "twal.h" +#include "tsync.h" +#include "syncInt.h" + +static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eindex) { + char name[TSDB_FILENAME_LEN*2] = {0}; + char fname[TSDB_FILENAME_LEN*3] = {0}; + uint32_t magic; + uint64_t fversion; + int32_t size; + uint32_t index = sindex; + SSyncNode *pNode = pPeer->pSyncNode; + + if (sindex < 0 || eindex < sindex) return; + + while (1) { + name[0] = 0; + magic = (*pNode->getFileInfo)(pNode->ahandle, name, &index, eindex, &size, &fversion); + if (magic == 0) break; + + snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name); + remove(fname); + sDebug("%s, %s is removed", pPeer->id, fname); + + index++; + if (index > eindex) break; + } +} + +static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info + SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info + SFileAck fileAck; + int code = -1; + char name[TSDB_FILENAME_LEN * 2] = {0}; + uint32_t pindex = 0; // index in last restore + + *fversion = 0; + sinfo.index = 0; + while (1) { + // read file info + int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); + if (ret < 0 ) break; + + // if no more file from master, break; + if (minfo.name[0] == 0 || minfo.magic == 0) { + sDebug("%s, no more files to restore", pPeer->id); + + // remove extra files after the current index + syncRemoveExtraFile(pPeer, sinfo.index+1, TAOS_SYNC_MAX_INDEX); + code = 0; + break; + } + + // remove extra files on slave between the current and last index + syncRemoveExtraFile(pPeer, pindex+1, minfo.index-1); + pindex = minfo.index; + + // check the file info + sinfo = minfo; + sDebug("%s, get file info:%s", pPeer->id, minfo.name); + sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion); + + // if file not there or magic is not the same, file shall be synced + memset(&fileAck, 0, sizeof(fileAck)); + fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1:0; + + // send file ack + ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); + if (ret <0) break; + + // if sync is not required, continue + if (fileAck.sync == 0) { + sDebug("%s, %s is the same", pPeer->id, minfo.name); + continue; + } + + // if sync is required, open file, receive from master, and write to file + // get the full path to file + snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name); + + int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + if ( dfd < 0 ) { + sError("%s, failed to open file:%s", pPeer->id, name); + break; + } + + ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size); + fsync(dfd); + close(dfd); + if (ret<0) break; + + sDebug("%s, %s is received, size:%d", pPeer->id, minfo.name, minfo.size); + + } + + if (code == 0 && (minfo.fversion != sinfo.fversion)) { + // data file is changed, code shall be set to 1 + *fversion = minfo.fversion; + code = 1; + } + + if (code < 0) { + sError("%s, failed to restore %s(%s)", pPeer->id, name, strerror(errno)); + } + + return code; +} + +static int syncRestoreWal(SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + int ret, code = -1; + + void *buffer = calloc(1024000, 1); // size for one record + if (buffer == NULL) return -1; + + SWalHead *pHead = (SWalHead *)buffer; + + while (1) { + ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); + if (ret <0) break; + + if (pHead->len == 0) {code = 0; break;} // wal sync over + + ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); + if (ret <0) break; + + sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version); + (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL); + } + + if (code<0) { + sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno)); + } + + free(buffer); + return code; +} + +static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SWalHead *pHead = (SWalHead *) offset; + + (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); + offset += pHead->len + sizeof(SWalHead); + + return offset; +} + +static int syncProcessBufferedFwd(SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SRecvBuffer *pRecv = pNode->pRecv; + int forwards = 0; + + sDebug("%s, number of buffered forwards:%d", pPeer->id, pRecv->forwards); + + char *offset = pRecv->buffer; + while (forwards < pRecv->forwards) { + offset = syncProcessOneBufferedFwd(pPeer, offset); + forwards++; + } + + pthread_mutex_lock(&pNode->mutex); + + while (forwards < pRecv->forwards && pRecv->code == 0) { + offset = syncProcessOneBufferedFwd(pPeer, offset); + forwards++; + } + + nodeRole = TAOS_SYNC_ROLE_SLAVE; + sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards); + + pthread_mutex_unlock(&pNode->mutex); + + return pRecv->code; +} + +int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SRecvBuffer *pRecv = pNode->pRecv; + + if (pRecv == NULL) return -1; + int len = pHead->len + sizeof(SWalHead); + + if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) { + memcpy(pRecv->offset, pHead, len); + pRecv->offset += len; + pRecv->forwards++; + sDebug("%s, fwd is saved into queue, ver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards); + } else { + sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize); + pRecv->code = -1; // set error code + } + + return pRecv->code; +} + +static void syncCloseRecvBuffer(SSyncNode *pNode) +{ + if (pNode->pRecv) { + tfree(pNode->pRecv->buffer); + } + + tfree(pNode->pRecv); +} + +static int syncOpenRecvBuffer(SSyncNode *pNode) +{ + syncCloseRecvBuffer(pNode); + + SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); + if (pRecv == NULL) return -1; + + pRecv->bufferSize = 5000000; + pRecv->buffer = malloc(pRecv->bufferSize); + if (pRecv->buffer == NULL) { + free(pRecv); + return -1; + } + + pRecv->offset = pRecv->buffer; + pRecv->forwards = 0; + + pNode->pRecv = pRecv; + + return 0; +} + +static int syncRestoreDataStepByStep(SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + nodeSStatus = TAOS_SYNC_STATUS_FILE; + uint64_t fversion = 0; + + sDebug("%s, start to restore file", pPeer->id); + int code = syncRestoreFile(pPeer, &fversion); + if (code < 0) { + sError("%s, failed to restore file", pPeer->id); + return -1; + } + + // if code > 0, data file is changed, notify app, and pass the version + if (code > 0 && pNode->notifyFileSynced) { + if ( (*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0 ) { + sError("%s, app not in ready state", pPeer->id); + return -1; + } + } + + nodeVersion = fversion; + + sDebug("%s, start to restore wal", pPeer->id); + if (syncRestoreWal(pPeer) < 0) { + sError("%s, failed to restore wal", pPeer->id); + return -1; + } + + nodeSStatus = TAOS_SYNC_STATUS_CACHE; + sDebug("%s, start to insert buffered points", pPeer->id); + if (syncProcessBufferedFwd(pPeer) < 0) { + sError("%s, failed to insert buffered points", pPeer->id); + return -1; + } + + return 0; +} + +void *syncRestoreData(void *param) +{ + SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncNode *pNode = pPeer->pSyncNode; + + taosBlockSIGPIPE(); + __sync_fetch_and_add(&tsSyncNum, 1); + + if (syncOpenRecvBuffer(pNode) < 0) { + sError("%s, failed to allocate recv buffer", pPeer->id); + } else { + if ( syncRestoreDataStepByStep(pPeer) == 0) { + sInfo("%s, it is synced successfully", pPeer->id); + nodeRole = TAOS_SYNC_ROLE_SLAVE; + syncBroadcastStatus(pNode); + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + } else { + sError("%s, failed to restore data, restart connection", pPeer->id); + nodeRole = TAOS_SYNC_ROLE_UNSYNCED; + syncRestartConnection(pPeer); + } + } + + nodeSStatus = TAOS_SYNC_STATUS_INIT; + tclose(pPeer->syncFd) + syncCloseRecvBuffer(pNode); + __sync_fetch_and_sub(&tsSyncNum, 1); + syncDecPeerRef(pPeer); + + return NULL; +} + diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c new file mode 100644 index 0000000000..c7f136ed9d --- /dev/null +++ b/src/sync/src/syncRetrieve.c @@ -0,0 +1,479 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include "os.h" +#include "tlog.h" +#include "tutil.h" +#include "tglobal.h" +#include "ttimer.h" +#include "tsocket.h" +#include "twal.h" +#include "tsync.h" +#include "syncInt.h" + +static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) +{ + sDebug("%s, start to monitor:%s", pPeer->id, name); + + if (pPeer->notifyFd <=0) { + pPeer->watchNum = 0; + pPeer->notifyFd = inotify_init1(IN_NONBLOCK); + if (pPeer->notifyFd < 0) { + sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno)); + return -1; + } + + if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles); + if (pPeer->watchFd == NULL) { + sError("%s, failed to allocate watchFd", pPeer->id); + return -1; + } + + memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles); + } + + int *wd = pPeer->watchFd + pPeer->watchNum; + + if (*wd >= 0) { + if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) { + sError("%s, failed to remove wd:%d(%s)", pPeer->id, *wd, strerror(errno)); + return -1; + } + } + + *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY); + if (*wd == -1) { + sError("%s, failed to add %s(%s)", pPeer->id, name, strerror(errno)); + return -1; + } + + pPeer->watchNum++; + pPeer->watchNum = (pPeer->watchNum +1) % tsMaxWatchFiles; + + return 0; +} + +static int syncAreFilesModified(SSyncPeer *pPeer) +{ + if (pPeer->notifyFd <=0) return 0; + + char buf[2048]; + int len = read(pPeer->notifyFd, buf, sizeof(buf)); + if (len <0 && errno != EAGAIN) { + sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + return -1; + } + + int code = 0; + if (len >0) { + sDebug("%s, processed file is changed", pPeer->id); + code = 1; + } + + return code; +} + +static int syncRetrieveFile(SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + SFileInfo fileInfo; + SFileAck fileAck; + int code = -1; + char name[TSDB_FILENAME_LEN * 2] = {0}; + + memset(&fileInfo, 0, sizeof(fileInfo)); + memset(&fileAck, 0, sizeof(fileAck)); + + while (1) { + // retrieve file info + fileInfo.name[0] = 0; + fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); + //fileInfo.size = htonl(size); + + // send the file info + int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); + if (ret < 0 ) break; + + // if no file anymore, break + if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { + sDebug("%s, no more files to sync", pPeer->id); + code = 0; break; + } + + // wait for the ack from peer + ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); + if (ret <0) break; + + // set the peer sync version + pPeer->sversion = fileInfo.fversion; + + // get the full path to file + snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); + + // add the file into watch list + if ( syncAddIntoWatchList(pPeer, name) <0) break; + + // if sync is not required, continue + if (fileAck.sync == 0) { + fileInfo.index++; + sDebug("%s, %s is the same", pPeer->id, fileInfo.name); + continue; + } + + // send the file to peer + int sfd = open(name, O_RDONLY); + if ( sfd < 0 ) break; + + ret = tsendfile(pPeer->syncFd, sfd, NULL, fileInfo.size); + close(sfd); + if (ret <0) break; + + sDebug("%s, %s is sent, size:%d", pPeer->id, name, fileInfo.size); + fileInfo.index++; + + // check if processed files are modified + if (syncAreFilesModified(pPeer) != 0) break; + } + + if (code < 0) { + sError("%s, failed to retrieve file(%s)", pPeer->id, strerror(errno)); + } + + return code; +} + +/* if only a partial record is read out, set the IN_MODIFY flag in event, + so upper layer will reload the file to get a complete record */ +static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) +{ + int ret; + + ret = read(sfd, pHead, sizeof(SWalHead)); + if (ret < 0) return -1; + if (ret == 0) return 0; + + if (ret != sizeof(SWalHead)) { + // file is not at end yet, it shall be reloaded + *pEvent = *pEvent | IN_MODIFY; + return 0; + } + + ret = read(sfd, pHead->cont, pHead->len); + if (ret <0) return -1; + + if (ret != pHead->len) { + // file is not at end yet, it shall be reloaded + *pEvent = *pEvent | IN_MODIFY; + return 0; + } + + return sizeof(SWalHead) + pHead->len; +} + +static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) +{ + pPeer->watchNum = 0; + tclose(pPeer->notifyFd); + pPeer->notifyFd = inotify_init1(IN_NONBLOCK); + if (pPeer->notifyFd < 0) { + sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno)); + return -1; + } + + if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles); + if (pPeer->watchFd == NULL) { + sError("%s, failed to allocate watchFd", pPeer->id); + return -1; + } + + memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles); + int *wd = pPeer->watchFd; + + *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); + if (*wd == -1) { + sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno)); + return -1; + } + + return 0; +} + +static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) +{ + char buf[2048]; + int len = read(pPeer->notifyFd, buf, sizeof(buf)); + if (len <0 && errno != EAGAIN) { + sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + return -1; + } + + if (len == 0) return 0; + + struct inotify_event *event; + for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { + event = (struct inotify_event *) ptr; + if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY; + if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE; + } + + if (pEvent != 0) + sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent); + + return 0; +} + +static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) +{ + SWalHead *pHead = (SWalHead *) malloc(640000); + int code = -1; + int32_t bytes = 0; + int sfd; + + sfd = open(name, O_RDONLY); + if (sfd < 0) return -1; + lseek(sfd, offset, SEEK_SET); + sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); + + while (1) { + int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); + if (wsize <0) break; + if (wsize == 0) { code = 0; break; } + + sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); + int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); + if ( ret != wsize ) break; + pPeer->sversion = pHead->version; + + bytes += wsize; + + if (pHead->version >= fversion && fversion > 0) { + code = 0; + bytes = 0; + break; + } + } + + free(pHead); + tclose(sfd); + + if (code == 0) return bytes; + return -1; +} + +static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) +{ + SSyncNode *pNode = pPeer->pSyncNode; + int code = -1; + char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file + + if (syncAreFilesModified(pPeer) != 0) return -1; + + while (1) { + int32_t once = 0; // last WAL has once ever been processed + int64_t offset = 0; + uint64_t fversion = 0; + uint32_t event = 0; + + // get full path to wal file + snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); + sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname); + + // monitor last wal + if (syncMonitorLastWal(pPeer, fname) <0) break; + + while (1) { + int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event); + if (bytes < 0) break; + + // check file changes + if (syncCheckLastWalChanges(pPeer, &event) <0) break; + + // if file is not updated or updated once, set the fversion and sstatus + if (((event & IN_MODIFY) == 0) || once) { + if (fversion == 0) { + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt + fversion = nodeVersion; // must read data to fversion + } + } + + // if all data up to fversion is read out, it is over + if (pPeer->sversion >= fversion && fversion > 0) { + code = 0; + sDebug("%s, data up to fversion:%ld has been read out, bytes:%d", pPeer->id, fversion, bytes); + break; + } + + // if all data are read out, and no update + if ((bytes == 0) && ((event & IN_MODIFY) == 0)) { + // wal file is closed, break + if (event & IN_CLOSE_WRITE) { + code = 0; + sDebug("%s, current wal is closed", pPeer->id); + break; + } + + // wal not closed, it means some data not flushed to disk, wait for a while + usleep(10000); + } + + // if bytes>0, file is updated, or fversion is not reached but file still open, read again + once = 1; + offset += bytes; + sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes); + event = event & (~IN_MODIFY); // clear IN_MODIFY flag + } + + if (code < 0) break; + if (pPeer->sversion >= fversion && fversion > 0) break; + + index++; wname[0] = 0; + code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); + if ( code < 0) break; + if ( wname[0] == 0 ) {code = 0; break;} + + // current last wal is closed, there is a new one + sDebug("%s, last wal is closed, try new one", pPeer->id); + } + + tclose(pPeer->notifyFd); + + return code; +} + +static int syncRetrieveWal(SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + char fname[TSDB_FILENAME_LEN * 3]; + char wname[TSDB_FILENAME_LEN * 2]; + int32_t size; + struct stat fstat; + int code = -1; + uint32_t index = 0; + + while (1) { + // retrieve wal info + wname[0] = 0; + code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); + if (code < 0) break; // error + if (wname[0] == 0) { // no wal file + sDebug("%s, no wal file", pPeer->id); + break; + } + + if (code == 0) { // last wal + code = syncProcessLastWal(pPeer, wname, index); + break; + } + + // get the full path to wal file + snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); + + // send wal file, + // inotify is not required, old wal file won't be modified, even remove is ok + if ( stat(fname, &fstat) < 0 ) break; + size = fstat.st_size; + + sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); + int sfd = open(fname, O_RDONLY); + if (sfd < 0) break; + + code = tsendfile(pPeer->syncFd, sfd, NULL, size); + close(sfd); + if (code <0) break; + + index++; + + if (syncAreFilesModified(pPeer) != 0) break; + } + + if (code == 0) { + sDebug("%s, wal retrieve is finished", pPeer->id); + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; + SWalHead walHead; + memset(&walHead, 0, sizeof(walHead)); + code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); + } else { + sError("%s, failed to send wal(%s)", pPeer->id, strerror(errno)); + } + + return code; +} + +static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) +{ + SSyncNode *pNode = pPeer->pSyncNode; + + SFirstPkt firstPkt; + memset(&firstPkt, 0, sizeof(firstPkt)); + firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA; + firstPkt.syncHead.vgId = pNode->vgId; + tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); + firstPkt.port = tsSyncPort; + + if (write(pPeer->syncFd, (char *) &firstPkt, sizeof(firstPkt)) < 0) { + sError("%s, failed to send syncCmd", pPeer->id); + return -1; + } + + pPeer->sversion = 0; + pPeer->sstatus = TAOS_SYNC_STATUS_FILE; + sDebug("%s, start to retrieve file", pPeer->id); + if (syncRetrieveFile(pPeer) < 0) { + sError("%s, failed to retrieve file", pPeer->id); + return -1; + } + + // if no files are synced, there must be wal to sync, sversion must be larger than one + if (pPeer->sversion == 0) + pPeer->sversion = 1; + + sDebug("%s, start to retrieve wal", pPeer->id); + if (syncRetrieveWal(pPeer) < 0) { + sError("%s, failed to retrieve wal", pPeer->id); + return -1; + } + + return 0; +} + +void *syncRetrieveData(void *param) +{ + SSyncPeer *pPeer = (SSyncPeer *)param; + taosBlockSIGPIPE(); + + pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); + if (pPeer->syncFd < 0) { + sError("%s, failed to open socket to sync", pPeer->id); + } else { + sInfo("%s, sync tcp is setup", pPeer->id); + + if (syncRetrieveDataStepByStep(pPeer) == 0) { + sDebug("%s, sync retrieve process is successful", pPeer->id); + } else { + sError("%s, failed to retrieve data, restart connection", pPeer->id); + syncRestartConnection(pPeer); + } + } + + tclose(pPeer->notifyFd); + tclose(pPeer->syncFd); + syncDecPeerRef(pPeer); + + return NULL; +} diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c new file mode 100644 index 0000000000..fa94caeed7 --- /dev/null +++ b/src/sync/src/taosTcpPool.c @@ -0,0 +1,325 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tulog.h" +#include "tutil.h" +#include "tsocket.h" +#include "taoserror.h" +#include "taosTcpPool.h" + +typedef struct SThreadObj { + pthread_t thread; + bool stop; + int pollFd; + int numOfFds; + struct SPoolObj *pPool; +} SThreadObj; + +typedef struct SPoolObj { + SPoolInfo info; + SThreadObj **pThread; + pthread_t thread; + int nextId; + int acceptFd; // FD for accept new connection +} SPoolObj; + +typedef struct { + SThreadObj *pThread; + void *ahandle; + int fd; + int closedByApp; +} SConnObj; + +static void *taosAcceptPeerTcpConnection(void *argv); +static void *taosProcessTcpData(void *param); +static SThreadObj *taosGetTcpThread(SPoolObj *pPool); +static void taosStopPoolThread(SThreadObj* pThread); + +void *taosOpenTcpThreadPool(SPoolInfo *pInfo) +{ + pthread_attr_t thattr; + + SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); + if (pPool == NULL) { + uError("TCP server, no enough memory"); + return NULL; + } + + pPool->info = *pInfo; + + pPool->pThread = (SThreadObj **) calloc(sizeof(SThreadObj *), pInfo->numOfThreads); + if (pPool->pThread == NULL) { + uError("TCP server, no enough memory"); + free(pPool); + return NULL; + } + + pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port); + if (pPool->acceptFd < 0) { + free(pPool->pThread); free(pPool); + uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno)); + return NULL; + } + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&(pPool->thread), &thattr, (void *) taosAcceptPeerTcpConnection, pPool) != 0) { + uError("TCP server, failed to create accept thread, reason:%s", strerror(errno)); + close(pPool->acceptFd); + free(pPool->pThread); free(pPool); + return NULL; + } + + pthread_attr_destroy(&thattr); + + uDebug("%p TCP pool is created", pPool); + return pPool; +} + +void taosCloseTcpThreadPool(void *param) +{ + SPoolObj *pPool = (SPoolObj *)param; + SThreadObj *pThread; + + shutdown(pPool->acceptFd, SHUT_RD); + pthread_join(pPool->thread, NULL); + + for (int i = 0; i < pPool->info.numOfThreads; ++i) { + pThread = pPool->pThread[i]; + if (pThread) taosStopPoolThread(pThread); + } + + tfree(pPool->pThread); + free(pPool); + uDebug("%p TCP pool is closed", pPool); +} + +void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) +{ + struct epoll_event event; + SPoolObj *pPool = (SPoolObj *)param; + + SConnObj *pConn = (SConnObj *) calloc(sizeof(SConnObj), 1); + if (pConn == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + SThreadObj *pThread = taosGetTcpThread(pPool); + if (pThread == NULL) { + free(pConn); + return NULL; + } + + pConn->fd = connFd; + pConn->pThread = pThread; + pConn->ahandle = pPeer; + pConn->closedByApp = 0; + + event.events = EPOLLIN | EPOLLRDHUP; + event.data.ptr = pConn; + + if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { + uError("failed to add fd:%d(%s)", connFd, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + free(pConn); + pConn = NULL; + } else { + pThread->numOfFds++; + uDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds); + } + + return pConn; +} + +void taosFreeTcpConn(void *param) +{ + SConnObj *pConn = (SConnObj *)param; + SThreadObj *pThread = pConn->pThread; + + uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); + pConn->closedByApp = 1; + shutdown(pConn->fd, SHUT_WR); +} + +static void taosProcessBrokenLink(SConnObj *pConn) { + SThreadObj *pThread = pConn->pThread; + SPoolObj *pPool = pThread->pPool; + SPoolInfo *pInfo = &pPool->info; + + if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR); + (*pInfo->processBrokenLink)(pConn->ahandle); + + pThread->numOfFds--; + epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); + uDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); + tclose(pConn->fd); + free(pConn); +} + +#define maxEvents 10 + +static void *taosProcessTcpData(void *param) { + SThreadObj *pThread = (SThreadObj *) param; + SPoolObj *pPool = pThread->pPool; + SPoolInfo *pInfo = &pPool->info; + SConnObj *pConn = NULL; + struct epoll_event events[maxEvents]; + + void *buffer = malloc(pInfo->bufferSize); + taosBlockSIGPIPE(); + + while (1) { + if (pThread->stop) break; + int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, -1); + if (pThread->stop) { + uDebug("%p TCP epoll thread is exiting...", pThread); + break; + } + + if (fdNum < 0) { + uError("epoll_wait failed (%s)", strerror(errno)); + continue; + } + + for (int i = 0; i < fdNum; ++i) { + pConn = events[i].data.ptr; + assert(pConn); + + if (events[i].events & EPOLLERR) { + taosProcessBrokenLink(pConn); + continue; + } + + if (events[i].events & EPOLLHUP) { + taosProcessBrokenLink(pConn); + continue; + } + + if (events[i].events & EPOLLRDHUP) { + taosProcessBrokenLink(pConn); + continue; + } + + if (pConn->closedByApp == 0) { + if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { + taosFreeTcpConn(pConn); + continue; + } + } + } + } + + close(pThread->pollFd); + free(pThread); + free(buffer); + uDebug("%p TCP epoll thread exits", pThread); + return NULL; +} + +static void *taosAcceptPeerTcpConnection(void *argv) { + SPoolObj *pPool = (SPoolObj *)argv; + SPoolInfo *pInfo = &pPool->info; + + taosBlockSIGPIPE(); + + while (1) { + struct sockaddr_in clientAddr; + socklen_t addrlen = sizeof(clientAddr); + int connFd = accept(pPool->acceptFd, (struct sockaddr *) &clientAddr, &addrlen); + if (connFd < 0) { + if (errno == EINVAL) { + uDebug("%p TCP server accept is exiting...", pPool); + break; + } else { + uError("TCP accept failure, reason:%s", strerror(errno)); + continue; + } + } + + //uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port); + taosKeepTcpAlive(connFd); + (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr); + } + + tclose(pPool->acceptFd); + return NULL; +} + +static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { + SThreadObj *pThread = pPool->pThread[pPool->nextId]; + + if (pThread) return pThread; + + pThread = (SThreadObj *) calloc(1, sizeof(SThreadObj)); + if (pThread == NULL) return NULL; + + pThread->pPool = pPool; + pThread->pollFd = epoll_create(10); // size does not matter + if (pThread->pollFd < 0) { + free(pThread); + return NULL; + } + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + int ret = pthread_create(&(pThread->thread), &thattr, (void *) taosProcessTcpData, pThread); + pthread_attr_destroy(&thattr); + + if (ret != 0) { + close(pThread->pollFd); + free(pThread); + return NULL; + } + + uDebug("%p TCP epoll thread is created", pThread); + pPool->pThread[pPool->nextId] = pThread; + pPool->nextId++; + pPool->nextId = pPool->nextId % pPool->info.numOfThreads; + + return pThread; +} + +static void taosStopPoolThread(SThreadObj* pThread) { + pThread->stop = true; + + if (pThread->thread == pthread_self()) { + pthread_detach(pthread_self()); + return; + } + + // save thread ID into a local variable, since pThread is freed when the thread exits + pthread_t thread = pThread->thread; + + // signal the thread to stop, try graceful method first, + // and use pthread_cancel when failed + struct epoll_event event = { .events = EPOLLIN }; + eventfd_t fd = eventfd(1, 0); + if (fd == -1) { + // failed to create eventfd, call pthread_cancel instead, which may result in data corruption + uError("failed to create eventfd(%s)", strerror(errno)); + pthread_cancel(pThread->thread); + } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption + uError("failed to call epoll_ctl(%s)", strerror(errno)); + pthread_cancel(pThread->thread); + } + + pthread_join(thread, NULL); + tclose(fd); +} + diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c new file mode 100644 index 0000000000..c308c2a454 --- /dev/null +++ b/src/sync/src/tarbitrator.c @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "hash.h" +#include "tlog.h" +#include "tutil.h" +#include "ttimer.h" +#include "ttime.h" +#include "tsocket.h" +#include "tglobal.h" +#include "taoserror.h" +#include "taosTcpPool.h" +#include "twal.h" +#include "tsync.h" +#include "syncInt.h" + +static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); +static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp); +static void arbProcessBrokenLink(void *param); +static int arbProcessPeerMsg(void *param, void *buffer); +static sem_t tsArbSem; +static ttpool_h tsArbTcpPool; + +typedef struct { + char id[TSDB_EP_LEN+24]; + int nodeFd; + void *pConn; +} SNodeConn; + +int main(int argc, char *argv[]) { + char arbLogPath[TSDB_FILENAME_LEN + 16] = {0}; + + for (int i=1; i TSDB_FILENAME_LEN) continue; + tstrncpy(arbLogPath, argv[i], sizeof(arbLogPath)); + } else { + printf("\nusage: %s [options] \n", argv[0]); + printf(" [-p port]: server port number, default is:%d\n", tsServerPort); + printf(" [-d debugFlag]: debug flag, default:%d\n", debugFlag); + printf(" [-g logFilePath]: log file pathe, default:%s\n", arbLogPath); + printf(" [-h help]: print out this help\n\n"); + exit(0); + } + } + + if (sem_init(&tsArbSem, 0, 0) != 0) { + printf("failed to create exit semphore\n"); + exit(EXIT_FAILURE); + } + + /* Set termination handler. */ + struct sigaction act = {{0}}; + act.sa_flags = SA_SIGINFO; + act.sa_sigaction = arbSignalHandler; + sigaction(SIGTERM, &act, NULL); + sigaction(SIGHUP, &act, NULL); + sigaction(SIGINT, &act, NULL); + + tsAsyncLog = 0; + strcat(arbLogPath, "/arbitrator.log"); + taosInitLog(arbLogPath, 1000000, 10); + + taosGetFqdn(tsNodeFqdn); + tsSyncPort = tsServerPort + TSDB_PORT_SYNC; + + SPoolInfo info; + info.numOfThreads = 1; + info.serverIp = 0; + info.port = tsSyncPort; + info.bufferSize = 640000; + info.processBrokenLink = arbProcessBrokenLink; + info.processIncomingMsg = arbProcessPeerMsg; + info.processIncomingConn = arbProcessIncommingConnection; + tsArbTcpPool = taosOpenTcpThreadPool(&info); + + if (tsArbTcpPool == NULL) { + sDebug("failed to open TCP thread pool, exit..."); + return -1; + } + + sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsServerPort); + + for (int res = sem_wait(&tsArbSem); res != 0; res = sem_wait(&tsArbSem)) { + if (res != EINTR) break; + } + + taosCloseTcpThreadPool(tsArbTcpPool); + sInfo("TAOS arbitrator is shut down\n"); + closelog(); + + return 0; +} + +static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) +{ + char ipstr[24]; + tinet_ntoa(ipstr, sourceIp); + sDebug("peer TCP connection from ip:%s", ipstr); + + SFirstPkt firstPkt; + if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { + sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno)); + taosCloseSocket(connFd); + return; + } + + SNodeConn *pNode = (SNodeConn *) calloc(sizeof(SNodeConn), 1); + if (pNode == NULL) { + sError("failed to allocate memory(%s)", strerror(errno)); + taosCloseSocket(connFd); + return; + } + + snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); + if (firstPkt.syncHead.vgId) { + sDebug("%s, vgId in head is not zero, close the connection", pNode->id); + tfree(pNode); + taosCloseSocket(connFd); + return; + } + + sDebug("%s, arbitrator request is accepted", pNode->id); + pNode->nodeFd = connFd; + pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd); + + return; +} + +static void arbProcessBrokenLink(void *param) { + SNodeConn *pNode = param; + + sDebug("%s, TCP link is broken(%s), close connection", pNode->id, strerror(errno)); + tfree(pNode); +} + +static int arbProcessPeerMsg(void *param, void *buffer) +{ + SNodeConn *pNode = param; + SSyncHead head; + int bytes = 0; + char *cont = (char *)buffer; + + int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); + if (hlen != sizeof(head)) { + sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen); + return -1; + } + + bytes = taosReadMsg(pNode->nodeFd, cont, head.len); + if (bytes != head.len) { + sDebug("%s, failed to read, bytes:%d len:%d", pNode->id, bytes, head.len); + return -1; + } + + sDebug("%s, msg is received, len:%d", pNode->id, head.len); + return 0; +} + +static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { + + struct sigaction act = {{0}}; + act.sa_handler = SIG_IGN; + sigaction(SIGTERM, &act, NULL); + sigaction(SIGHUP, &act, NULL); + sigaction(SIGINT, &act, NULL); + + sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); + + // inform main thread to exit + sem_post(&tsArbSem); +} + diff --git a/src/sync/test/CMakeLists.txt b/src/sync/test/CMakeLists.txt new file mode 100644 index 0000000000..fd8bdd1668 --- /dev/null +++ b/src/sync/test/CMakeLists.txt @@ -0,0 +1,20 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND CLIENT_SRC ./syncClient.c) + ADD_EXECUTABLE(syncClient ${CLIENT_SRC}) + TARGET_LINK_LIBRARIES(syncClient sync trpc common) + + LIST(APPEND SERVER_SRC ./syncServer.c) + ADD_EXECUTABLE(syncServer ${SERVER_SRC}) + TARGET_LINK_LIBRARIES(syncServer sync trpc common) +ENDIF () + + diff --git a/src/sync/test/syncClient.c b/src/sync/test/syncClient.c new file mode 100644 index 0000000000..cd873b758b --- /dev/null +++ b/src/sync/test/syncClient.c @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tglobal.h" +#include "tulog.h" +#include "trpc.h" +#include "taoserror.h" + +typedef struct { + int index; + SRpcEpSet epSet; + int num; + int numOfReqs; + int msgSize; + sem_t rspSem; + sem_t *pOverSem; + pthread_t thread; + void *pRpc; +} SInfo; + +void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + SInfo *pInfo = (SInfo *)pMsg->ahandle; + uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); + + if (pEpSet) pInfo->epSet = *pEpSet; + rpcFreeCont(pMsg->pCont); + + sem_post(&pInfo->rspSem); +} + +int tcount = 0; + +void *sendRequest(void *param) { + SInfo *pInfo = (SInfo *)param; + SRpcMsg rpcMsg = {0}; + + uDebug("thread:%d, start to send request", pInfo->index); + + while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + pInfo->num++; + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.ahandle = pInfo; + rpcMsg.msgType = 1; + uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); + if ( pInfo->num % 20000 == 0 ) + uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + sem_wait(&pInfo->rspSem); + } + + uDebug("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SRpcEpSet epSet; + char secret[TSDB_KEY_LEN] = "mypassword"; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + epSet.numOfEps = 1; + epSet.inUse = 0; + epSet.port[0] = 7000; + epSet.port[1] = 7000; + strcpy(epSet.fqdn[0], serverIp); + strcpy(epSet.fqdn[1], "192.168.0.1"); + + // client info + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "APP"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processResponse; + rpcInit.sessions = 100; + rpcInit.idleTime = tsShellActivityTimer*1000; + rpcInit.user = "michael"; + rpcInit.secret = secret; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + for (int i=1; iindex = i; + pInfo->epSet = epSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + sem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while ( tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime)/1000.0; // mseconds + + uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); + uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); + + taosCloseLog(); + + return 0; +} + + diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c new file mode 100644 index 0000000000..44019ad96e --- /dev/null +++ b/src/sync/test/syncServer.c @@ -0,0 +1,487 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include +#include "os.h" +#include "tulog.h" +#include "tglobal.h" +#include "tsocket.h" +#include "trpc.h" +#include "tqueue.h" +#include "twal.h" +#include "tsync.h" + +int msgSize = 128; +int commit = 0; +int dataFd = -1; +void *qhandle = NULL; +int walNum = 0; +uint64_t tversion = 0; +void *syncHandle; +int role; +int nodeId; +char path[256]; +int numOfWrites ; +SSyncInfo syncInfo; +SSyncCfg *pCfg; + +int writeIntoWal(SWalHead *pHead) +{ + if (dataFd < 0) { + char walName[280]; + snprintf(walName, sizeof(walName), "%s/wal/wal.%d", path, walNum); + remove(walName); + dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); + if (dataFd < 0) { + uInfo("failed to open wal file:%s(%s)", walName, strerror(errno)); + return -1; + } else { + walNum++; + uInfo("file:%s is opened to write, walNum:%d", walName, walNum); + } + } + + if (write(dataFd, pHead, sizeof(SWalHead) + pHead->len) < 0) { + uError("ver:%" PRIu64 ", failed to write wal file(%s)", pHead->version, strerror(errno)); + } else { + uDebug("ver:%" PRIu64 ", written to wal", pHead->version); + } + + numOfWrites++; + if (numOfWrites >= 10000) { + uInfo("%d request have been written into disk", numOfWrites); + close(dataFd); + dataFd = -1; + numOfWrites = 0; + } + + return 0; +} + +void confirmForward(void *ahandle, void *mhandle, int32_t code) +{ + SRpcMsg *pMsg = (SRpcMsg *)mhandle; + SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); + + uDebug("ver:%" PRIu64 ", confirm is received", pHead->version); + + rpcFreeCont(pMsg->pCont); + + SRpcMsg rpcMsg; + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = code; + rpcSendResponse(&rpcMsg); + + taosFreeQitem(mhandle); +} + +int processRpcMsg(void *item) { + SRpcMsg *pMsg = (SRpcMsg *)item; + SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); + int code = -1; + + if (role != TAOS_SYNC_ROLE_MASTER) { + uError("not master, write failed, role:%s", syncRole[role]); + } else { + + pHead->version = ++tversion; + pHead->msgType = pMsg->msgType; + pHead->len = pMsg->contLen; + + uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version); + writeIntoWal(pHead); + syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC); + + code = 0; + } + + if (pCfg->quorum <= 1) { + taosFreeQitem(item); + rpcFreeCont(pMsg->pCont); + + SRpcMsg rpcMsg; + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = code; + rpcSendResponse(&rpcMsg); + } + + return code; +} + +int processFwdMsg(void *item) { + + SWalHead *pHead = (SWalHead *)item; + + if (pHead->version <= tversion) { + uError("ver:%" PRIu64 ", forward is even lower than local:%" PRIu64, pHead->version, tversion); + return -1; + } + + uDebug("ver:%" PRIu64 ", forward from peer is received", pHead->version); + writeIntoWal(pHead); + tversion = pHead->version; + + if (pCfg->quorum > 1) syncConfirmForward(syncHandle, pHead->version, 0); + + // write into cache + +/* + if (pHead->handle) { + syncSendFwdAck(syncHandle, pHead->handle, 0); + } +*/ + + taosFreeQitem(item); + + return 0; +} + +int processWalMsg(void *item) { + + SWalHead *pHead = (SWalHead *)item; + + if (pHead->version <= tversion) { + uError("ver:%" PRIu64 ", wal is even lower than local:%" PRIu64, pHead->version, tversion); + return -1; + }; + + uDebug("ver:%" PRIu64 ", wal from peer is received", pHead->version); + writeIntoWal(pHead); + tversion = pHead->version; + + // write into cache + +/* + if (pHead->handle) { + syncSendFwdAck(syncHandle, pHead->handle, 0); + } +*/ + + taosFreeQitem(item); + + return 0; +} + +void *processWriteQueue(void *param) { + int type; + void *item; + + while (1) { + int ret = taosReadQitem(qhandle, &type, &item); + if (ret <= 0) { + usleep(1000); + continue; + } + + if (type == TAOS_QTYPE_RPC) { + processRpcMsg(item); + } else if (type == TAOS_QTYPE_WAL) { + processWalMsg(item); + } else if (type == TAOS_QTYPE_FWD) { + processFwdMsg(item); + } + + } + + return NULL; +} + +int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int ret = 0; + + if (strcmp(meterId, "michael") == 0) { + *spi = 1; + *encrypt = 0; + strcpy(secret, "mypassword"); + strcpy(ckey, "key"); + } else if (strcmp(meterId, "jeff") == 0) { + *spi = 0; + *encrypt = 0; + } else { + ret = -1; // user not there + } + + return ret; +} + +void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + uDebug("request is received, type:%d, len:%d", pMsg->msgType, pMsg->contLen); + taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); +} + +uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) +{ + uint32_t magic; + struct stat fstat; + char aname[280]; + + if (*index == 2) { + uInfo("wait for a while ....."); + sleep(3); + } + + if (name[0] == 0) { + // find the file + snprintf(aname, sizeof(aname), "%s/data/data.%d", path, *index); + sprintf(name, "data/data.%d", *index); + } else { + snprintf(aname, sizeof(aname), "%s/%s", path, name); + } + + uInfo("get file info:%s", aname); + if ( stat(aname, &fstat) < 0 ) return 0; + + *size = fstat.st_size; + magic = fstat.st_size; + + return magic; +} + +int getWalInfo(void *ahandle, char *name, uint32_t *index) { + + struct stat fstat; + char aname[280]; + + name[0] = 0; + if (*index + 1> walNum) return 0; + + snprintf(aname, sizeof(aname), "%s/wal/wal.%d", path, *index); + sprintf(name, "wal/wal.%d", *index); + uInfo("get wal info:%s", aname); + + if ( stat(aname, &fstat) < 0 ) return -1; + + if (*index >= walNum-1) return 0; // no more + + return 1; + +} + +int writeToCache(void *ahandle, void *data, int type) { + SWalHead *pHead = data; + + uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); + + int msgSize = pHead->len + sizeof(SWalHead); + void *pMsg = taosAllocateQitem(msgSize); + memcpy(pMsg, pHead, msgSize); + taosWriteQitem(qhandle, type, pMsg); + + return 0; +} + +void confirmFwd(void *ahandle, int64_t version) { + + return; +} + +void notifyRole(void *ahandle, int8_t r) { + role = r; + printf("current role:%s\n", syncRole[role]); +} + + +void initSync() { + + pCfg->replica = 1; + pCfg->quorum = 1; + syncInfo.vgId = 1; + syncInfo.ahandle = &syncInfo; + syncInfo.getFileInfo = getFileInfo; + syncInfo.getWalInfo = getWalInfo; + syncInfo.writeToCache = writeToCache; + syncInfo.confirmForward = confirmForward; + syncInfo.notifyRole = notifyRole; + + pCfg->nodeInfo[0].nodeId = 1; + pCfg->nodeInfo[0].nodePort = 7010; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodeId = 2; + pCfg->nodeInfo[1].nodePort = 7110; + taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodeId = 3; + pCfg->nodeInfo[2].nodePort = 7210; + taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + pCfg->nodeInfo[3].nodeId = 4; + pCfg->nodeInfo[3].nodePort = 7310; + taosGetFqdn(pCfg->nodeInfo[3].nodeFqdn); + + pCfg->nodeInfo[4].nodeId = 5; + pCfg->nodeInfo[4].nodePort = 7410; + taosGetFqdn(pCfg->nodeInfo[4].nodeFqdn); +} + +void doSync() +{ + for (int i=0; i<5; ++i) { + if (tsSyncPort == pCfg->nodeInfo[i].nodePort) + nodeId = pCfg->nodeInfo[i].nodeId; + } + + snprintf(path, sizeof(path), "/root/test/d%d", nodeId); + strcpy(syncInfo.path, path); + + if ( syncHandle == NULL) { + syncHandle = syncStart(&syncInfo); + } else { + if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL; + } + + uInfo("nodeId:%d path:%s syncPort:%d", nodeId, path, tsSyncPort); +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + char dataName[20] = "server.data"; + pCfg = &syncInfo.syncCfg; + + initSync(); + + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 7000; + rpcInit.label = "SER"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processRequestMsg; + rpcInit.sessions = 1000; + rpcInit.idleTime = tsShellActivityTimer*1500; + rpcInit.afp = retrieveAuthInfo; + + for (int i=1; ireplica = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q")==0 && i < argc-1) { + pCfg->quorum = atoi(argv[++i]); + } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { + rpcDebugFlag = atoi(argv[++i]); + } else { + printf("\nusage: %s [options] \n", argv[0]); + printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort); + printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); + printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions); + printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); + printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize); + printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit); + printf(" [-v version]: initial node version, default is:%ld\n", syncInfo.version); + printf(" [-r replica]: replicacation number, default is:%d\n", pCfg->replica); + printf(" [-q quorum]: quorum, default is:%d\n", pCfg->quorum); + printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag); + printf(" [-h help]: print out this help\n\n"); + exit(0); + } + } + + uDebugFlag = rpcDebugFlag; + dDebugFlag = rpcDebugFlag; + //tmrDebugFlag = rpcDebugFlag; + tsAsyncLog = 0; + taosInitLog("server.log", 1000000, 10); + + rpcInit.connType = TAOS_CONN_SERVER; + void *pRpc = rpcOpen(&rpcInit); + if (pRpc == NULL) { + uError("failed to start RPC server"); + return -1; + } + + tsSyncPort = rpcInit.localPort + 10; + qhandle = taosOpenQueue(); + + doSync(); + + pthread_attr_t thattr; + pthread_t thread; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&thread, &thattr, processWriteQueue, NULL) != 0) { + uError("failed to create thread, reason:%s", strerror(errno)); + return -1; + } + + printf("server is running, localPort:%d\n", rpcInit.localPort); + SNodesRole nroles; + + while (1) { + char c = getchar(); + + switch(c) { + case '1': + pCfg->replica = 1; doSync(); + break; + case '2': + pCfg->replica = 2; doSync(); + break; + case '3': + pCfg->replica = 3; doSync(); + break; + case '4': + pCfg->replica = 4; doSync(); + break; + case '5': + pCfg->replica = 5; doSync(); + break; + case 's': + syncGetNodesRole(syncHandle, &nroles); + for (int i=0; ireplica; ++i) + printf("=== nodeId:%d role:%s\n", nroles.nodeId[i], syncRole[nroles.role[i]]); + break; + default: + break; + } + + if (c=='q') break; + + } + + syncStop(syncHandle); + + if (dataFd >= 0) { + close(dataFd); + remove(dataName); + } + + return 0; +} + + -- GitLab