提交 fca35ceb 编写于 作者: L lichuang

[TD-10645][raft]<feature>add sync rpc client and server

上级 0214eda6
......@@ -4,6 +4,7 @@ add_library(sync ${SYNC_SRC})
PUBLIC common
PUBLIC transport
......@@ -39,12 +39,17 @@ struct SSyncNode {
typedef struct SSyncManager {
pthread_mutex_t mutex;
// sync server rpc
void* serverRpc;
// rpc server hash table base on FQDN:port key
SHashObj* rpcServerTable;
// sync client rpc
void* clientRpc;
// worker threads
SSyncWorker worker[TAOS_SYNC_MAX_WORKER];
// sync net worker
SSyncWorker netWorker;
// vgroup hash table
SHashObj* vgroupTable;
......@@ -14,12 +14,20 @@
#include "syncInt.h"
#include "trpc.h"
#include "ttimer.h"
SSyncManager* gSyncManager = NULL;
#define SYNC_TICK_TIMER 50
static void syncProcessRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
static void syncProcessReqMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg);
static int syncInitRpcClient(SSyncManager* syncManager);
static int syncOpenWorkerPool(SSyncManager* syncManager);
static int syncCloseWorkerPool(SSyncManager* syncManager);
static void *syncWorkerMain(void *argv);
......@@ -30,7 +38,7 @@ int32_t syncInit() {
return 0;
gSyncManager = (SSyncManager*)malloc(sizeof(SSyncManager));
gSyncManager = (SSyncManager*)calloc(sizeof(SSyncManager), 0);
if (gSyncManager == NULL) {
syncError("malloc SSyncManager fail");
return -1;
......@@ -38,6 +46,12 @@ int32_t syncInit() {
pthread_mutex_init(&gSyncManager->mutex, NULL);
// init client rpc
if (syncInitRpcClient(gSyncManager) != 0) {
return -1;
// init sync timer manager
gSyncManager->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
if (gSyncManager->syncTimerManager == NULL) {
......@@ -68,7 +82,13 @@ void syncCleanUp() {
if (gSyncManager->vgroupTable) {
if (gSyncManager->clientRpc) {
syncInfo("sync inter-sync rpc client is closed");
if (gSyncManager->syncTimerManager) {
......@@ -86,6 +106,12 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) {
return *ppNode;
// init rpc server
if (syncInitRpcServer(gSyncManager, &pInfo->syncCfg) != 0) {
return NULL;
SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode));
if (pNode == NULL) {
syncError("malloc vgroup %d node fail", pInfo->vgId);
......@@ -141,6 +167,82 @@ int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, b
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
// process rpc rsp message from other sync server
static void syncProcessRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
// process rpc message from other sync server
static void syncProcessReqMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg) {
if (gSyncManager->rpcServerTable == NULL) {
gSyncManager->rpcServerTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (gSyncManager->rpcServerTable == NULL) {
syncError("init sync rpc server hash table error");
return -1;
assert(pSyncCfg->selfIndex < pSyncCfg->replica && pSyncCfg->selfIndex >= 0);
const SNodeInfo* pNode = &(pSyncCfg->nodeInfo[pSyncCfg->replica]);
char buffer[20] = {'\0'};
snprintf(buffer, sizeof(buffer), "%s:%d", &(pNode->nodeFqdn[0]), pNode->nodePort);
size_t len = strlen(buffer);
void** ppRpcServer = taosHashGet(gSyncManager->rpcServerTable, buffer, len);
if (ppRpcServer != NULL) {
// already inited
syncInfo("sync rpc server for %s already exist", buffer);
return 0;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pNode->nodePort;
rpcInit.label = "sync-server";
rpcInit.numOfThreads = SYNC_SERVER_WORKER;
rpcInit.cfp = syncProcessReqMsg;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000;
void* rpcServer = rpcOpen(&rpcInit);
if (rpcServer == NULL) {
syncInfo("rpcOpen for sync rpc server for %s fail", buffer);
return -1;
taosHashPut(gSyncManager->rpcServerTable, buffer, strlen(buffer), rpcServer, len);
syncInfo("sync rpc server for %s init success", buffer);
return 0;
static int syncInitRpcClient(SSyncManager* syncManager) {
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "sync-client";
rpcInit.numOfThreads = 1;
rpcInit.cfp = syncProcessRsp;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000;
rpcInit.user = "t";
rpcInit.ckey = "key";
rpcInit.secret = secret;
syncManager->clientRpc = rpcOpen(&rpcInit);
if (syncManager->clientRpc == NULL) {
syncError("failed to init sync rpc client");
return -1;
syncInfo("sync inter-sync rpc client is initialized");
return 0;
static int syncOpenWorkerPool(SSyncManager* syncManager) {
int i;
pthread_attr_t thattr;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册