From 7a47508c0e804985e2470a01e588b071db87601b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 23 Feb 2022 16:58:47 +0800 Subject: [PATCH] add sync io --- source/libs/sync/inc/syncIO.h | 7 ++++- source/libs/sync/src/syncIO.c | 49 +++++++++++++++++++++++------------ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index bf3b3d34c1..8775326bdd 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -37,6 +37,11 @@ typedef struct SSyncIO { pthread_t tid; int8_t isStart; + SEpSet epSet; + + void *syncTimer; +void *syncTimerManager; + int32_t (*start)(struct SSyncIO *ths); int32_t (*stop)(struct SSyncIO *ths); int32_t (*ping)(struct SSyncIO *ths); @@ -45,7 +50,7 @@ typedef struct SSyncIO { } SSyncIO; -SSyncIO * syncIOCreate(); +SSyncIO *syncIOCreate(); static int32_t syncIOStart(SSyncIO *io); static int32_t syncIOStop(SSyncIO *io); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 023836f74f..20c0f8038c 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -17,8 +17,30 @@ #include #include "syncOnMessage.h" #include "tglobal.h" +#include "ttimer.h" #include "tutil.h" +static void syncTick(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sDebug("syncTick ... "); + + SRpcMsg rpcMsg; + rpcMsg.pCont = rpcMallocCont(10); + snprintf(rpcMsg.pCont, 10, "TICK"); + rpcMsg.contLen = 10; + rpcMsg.handle = io; + rpcMsg.msgType = 2; + + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); + + io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); +} + void *syncConsumer(void *param) { SSyncIO *io = param; @@ -58,6 +80,7 @@ void *syncConsumer(void *param) { } taosFreeQall(qall); + return NULL; } static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { @@ -68,17 +91,8 @@ static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encryp } static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - /* -// SInfo *pInfo = (SInfo *)pMsg->ahandle; -sDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, - pMsg->code); - -if (pEpSet) pInfo->epSet = *pEpSet; - -rpcFreeCont(pMsg->pCont); -// tsem_post(&pInfo->rspSem); -tsem_post(&pInfo->rspSem); -*/ + sDebug("processResponse ... "); + rpcFreeCont(pMsg->pCont); } static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -158,6 +172,9 @@ static int32_t syncIOStart(SSyncIO *io) { } } + io->epSet.inUse = 0; + addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000); + // start consumer thread { if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { @@ -167,6 +184,10 @@ static int32_t syncIOStart(SSyncIO *io) { } } + // start tmr thread + io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); + return 0; } @@ -185,11 +206,7 @@ static int32_t syncIOPing(SSyncIO *io) { rpcMsg.handle = io; rpcMsg.msgType = 1; - SEpSet epSet; - epSet.inUse = 0; - addEpIntoEpSet(&epSet, "127.0.0.1", 38000); - - rpcSendRequest(io->clientRpc, &epSet, &rpcMsg, NULL); + rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL); return 0; } -- GitLab