提交 5a4fbcf2 编写于 作者: M Minghao Li

sync io

上级 ee43a70c
......@@ -30,35 +30,37 @@ extern "C" {
#include "trpc.h"
typedef struct SSyncIO {
void * serverRpc;
void * clientRpc;
STaosQueue *pMsgQ;
STaosQset * pQset;
pthread_t tid;
int8_t isStart;
STaosQset *pQset;
pthread_t consumerTid;
SEpSet epSet;
void *serverRpc;
void *clientRpc;
SEpSet myAddr;
void *syncTimer;
void *syncTimerManager;
int32_t (*start)(struct SSyncIO *ths);
int32_t (*stop)(struct SSyncIO *ths);
int32_t (*ping)(struct SSyncIO *ths);
int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t (*destroy)(struct SSyncIO *ths);
void *ioTimerTickQ;
void *ioTimerTickPing;
void *ioTimerManager;
void *pSyncNode;
int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg);
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg);
int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg);
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg);
int8_t isStart;
} SSyncIO;
extern SSyncIO *gSyncIO;
int32_t syncIOStart();
int32_t syncIOStart(char *host, uint16_t port);
int32_t syncIOStop();
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOTickQ();
int32_t syncIOTickPing();
#ifdef __cplusplus
}
......
......@@ -76,4 +76,3 @@ static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int m
}
static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {}
// --------------------------------
\ No newline at end of file
......@@ -23,146 +23,66 @@
SSyncIO *gSyncIO = NULL;
// local function ------------
static int32_t doSyncIOStart(SSyncIO *io);
static int32_t doSyncIOStop(SSyncIO *io);
static int32_t doSyncIOPing(SSyncIO *io);
static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t doSyncIODestroy(SSyncIO *io);
static SSyncIO *syncIOCreate();
static void *syncIOConsumer(void *param);
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOTick(void *param, void *tmrId);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static SSyncIO *syncIOCreate(char *host, uint16_t port);
static int32_t syncIODestroy(SSyncIO *io);
static void *syncIOConsumerFunc(void *param);
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOTickQInternal(SSyncIO *io);
static void syncIOTickQFunc(void *param, void *tmrId);
static int32_t syncIOTickPingInternal(SSyncIO *io);
static void syncIOTickPingFunc(void *param, void *tmrId);
// ----------------------------
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
// public function ------------
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
sTrace("syncIOSendMsg ... ");
pMsg->handle = NULL;
rpcSendRequest(handle, pEpSet, pMsg, NULL);
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
}
int32_t syncIOStart() {
gSyncIO = syncIOCreate();
int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL);
int32_t ret = doSyncIOStart(gSyncIO);
int32_t ret = syncIOStartInternal(gSyncIO);
assert(ret == 0);
return 0;
}
int32_t syncIOStop() { return 0; }
// local function ------------
static void syncIOTick(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sDebug("syncIOTick ... ");
SRpcMsg rpcMsg;
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "TICK");
rpcMsg.contLen = 10;
rpcMsg.handle = NULL;
rpcMsg.msgType = 2;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, &io->syncTimer);
}
static void *syncIOConsumer(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
sDebug("%d sync-io msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont));
}
taosResetQitems(qall);
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) {
int msgSize = 128;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
}
taosFreeQitem(pRpcMsg);
}
}
taosFreeQall(qall);
return NULL;
}
int32_t syncIOStop() {
int32_t ret = syncIOStopInternal(gSyncIO);
assert(ret == 0);
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
ret = syncIODestroy(gSyncIO);
assert(ret == 0);
return ret;
}
static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sDebug("syncIODoReply ... ");
rpcFreeCont(pMsg->pCont);
}
static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SSyncIO *io = pParent;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(io->pMsgQ, pTemp);
int32_t syncIOTickQ() {
int32_t ret = syncIOTickQInternal(gSyncIO);
assert(ret == 0);
return ret;
}
static SSyncIO *syncIOCreate() {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
io->start = doSyncIOStart;
io->stop = doSyncIOStop;
io->ping = doSyncIOPing;
io->onMsg = doSyncIOOnMsg;
io->destroy = doSyncIODestroy;
return io;
int32_t syncIOTickPing() {
int32_t ret = syncIOTickPingInternal(gSyncIO);
assert(ret == 0);
return ret;
}
static int32_t doSyncIOStart(SSyncIO *io) {
// local function ------------
static int32_t syncIOStartInternal(SSyncIO *io) {
taosBlockSIGPIPE();
rpcInit();
tsRpcForceTcp = 1;
// cient rpc init
......@@ -172,7 +92,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
rpcInit.localPort = 0;
rpcInit.label = "SYNC-IO-CLIENT";
rpcInit.numOfThreads = 1;
rpcInit.cfp = syncIODoReply;
rpcInit.cfp = syncIOProcessReply;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "sync-io";
......@@ -195,7 +115,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
rpcInit.localPort = 7010;
rpcInit.label = "SYNC-IO-SERVER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = syncIODoRequest;
rpcInit.cfp = syncIOProcessRequest;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = syncIOAuth;
......@@ -209,12 +129,9 @@ static int32_t doSyncIOStart(SSyncIO *io) {
}
}
io->epSet.inUse = 0;
addEpIntoEpSet(&io->epSet, "127.0.0.1", 7010);
// start consumer thread
{
if (pthread_create(&io->tid, NULL, syncIOConsumer, io) != 0) {
if (pthread_create(&io->consumerTid, NULL, syncIOConsumerFunc, io) != 0) {
sError("failed to create sync consumer thread since %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -222,35 +139,32 @@ static int32_t doSyncIOStart(SSyncIO *io) {
}
// start tmr thread
io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager);
io->ioTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
return 0;
}
static int32_t doSyncIOStop(SSyncIO *io) {
static int32_t syncIOStopInternal(SSyncIO *io) {
atomic_store_8(&io->isStart, 0);
pthread_join(io->tid, NULL);
pthread_join(io->consumerTid, NULL);
return 0;
}
static int32_t doSyncIOPing(SSyncIO *io) {
SRpcMsg rpcMsg, rspMsg;
static SSyncIO *syncIOCreate(char *host, uint16_t port) {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "ping");
rpcMsg.contLen = 10;
rpcMsg.handle = NULL;
rpcMsg.msgType = 1;
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL);
io->myAddr.inUse = 0;
addEpIntoEpSet(&io->myAddr, host, port);
return 0;
return io;
}
static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; }
static int32_t doSyncIODestroy(SSyncIO *io) {
static int32_t syncIODestroy(SSyncIO *io) {
int8_t start = atomic_load_8(&io->isStart);
assert(start == 0);
......@@ -264,15 +178,136 @@ static int32_t doSyncIODestroy(SSyncIO *io) {
io->clientRpc = NULL;
}
if (io->pMsgQ != NULL) {
free(io->pMsgQ);
io->pMsgQ = NULL;
}
taosCloseQueue(io->pMsgQ);
taosCloseQset(io->pQset);
return 0;
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
if (io->pQset != NULL) {
free(io->pQset);
io->pQset = NULL;
STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen,
(char *)(pRpcMsg->pCont));
if (pRpcMsg->msgType == SYNC_PING) {
if (io->FpOnSyncPing != NULL) {
SyncPing *pSyncMsg = syncPingBuild(pRpcMsg->contLen);
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen);
syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
} else {
;
}
}
taosResetQitems(qall);
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) {
int msgSize = 128;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply");
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
sTrace("syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d", pRpcMsg->msgType, rpcMsg.contLen);
rpcSendResponse(&rpcMsg);
}
taosFreeQitem(pRpcMsg);
}
}
taosFreeQall(qall);
return NULL;
}
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
return ret;
}
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sTrace("syncIOProcessRequest: type:%d, contLen:%d, cont:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont);
SSyncIO *io = pParent;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
}
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sTrace("syncIOProcessReply: type:%d, contLen:%d msg:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont);
rpcFreeCont(pMsg->pCont);
}
static int32_t syncIOTickQInternal(SSyncIO *io) {
io->ioTimerTickQ = taosTmrStart(syncIOTickQFunc, 1000, io, io->ioTimerManager);
return 0;
}
static void syncIOTickQFunc(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sTrace("<-- syncIOTickQFunc -->");
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickQ");
rpcMsg.handle = NULL;
rpcMsg.msgType = 55;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
taosTmrReset(syncIOTickQFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickQ);
}
static int32_t syncIOTickPingInternal(SSyncIO *io) {
io->ioTimerTickPing = taosTmrStart(syncIOTickPingFunc, 1000, io, io->ioTimerManager);
return 0;
}
static void syncIOTickPingFunc(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sTrace("<-- syncIOTickPingFunc -->");
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickPing");
rpcMsg.handle = NULL;
rpcMsg.msgType = 77;
rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL);
taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing);
}
......@@ -2,6 +2,8 @@ add_executable(syncTest "")
add_executable(syncEnvTest "")
add_executable(syncPingTest "")
add_executable(syncEncodeTest "")
add_executable(syncIOTickQ "")
add_executable(syncIOTickPing "")
target_sources(syncTest
......@@ -20,6 +22,14 @@ target_sources(syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
)
target_sources(syncIOTickQ
PRIVATE
"syncIOTickQ.cpp"
)
target_sources(syncIOTickPing
PRIVATE
"syncIOTickPing.cpp"
)
target_include_directories(syncTest
......@@ -42,6 +52,16 @@ target_include_directories(syncEncodeTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncIOTickQ
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncIOTickPing
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -60,6 +80,14 @@ target_link_libraries(syncEncodeTest
sync
gtest_main
)
target_link_libraries(syncIOTickQ
sync
gtest_main
)
target_link_libraries(syncIOTickPing
sync
gtest_main
)
enable_testing()
......
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret;
ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0);
ret = syncIOTickPing();
assert(ret == 0);
while (1) {
sleep(1);
}
return 0;
}
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret;
ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0);
ret = syncIOTickQ();
assert(ret == 0);
while (1) {
sleep(1);
}
return 0;
}
......@@ -39,7 +39,7 @@ SSyncNode* doSync() {
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
......@@ -57,23 +57,23 @@ int main() {
logTest();
int32_t ret = syncIOStart();
int32_t ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
/*
SSyncNode* pSyncNode = doSync();
/*
SSyncNode* pSyncNode = doSync();
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
taosMsleep(5000);
taosMsleep(5000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
*/
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
*/
while (1) {
taosMsleep(1000);
......
......@@ -8,7 +8,7 @@ void *pingFunc(void *param) {
SSyncIO *io = (SSyncIO *)param;
while (1) {
sDebug("io->ping");
io->ping(io);
// io->ping(io);
sleep(1);
}
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册