提交 83ecb8dd 编写于 作者: M Minghao Li

sync refactor

上级 61f22781
...@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); ...@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io);
static void * syncIOConsumerFunc(void *param); static void *syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
...@@ -75,6 +75,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -75,6 +75,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
syncRpcMsgPrint2(logBuf, pMsg); syncRpcMsgPrint2(logBuf, pMsg);
pMsg->handle = NULL; pMsg->handle = NULL;
pMsg->noResp = 1;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return ret; return ret;
} }
...@@ -234,9 +235,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { ...@@ -234,9 +235,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
...@@ -324,19 +325,21 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -324,19 +325,21 @@ static void *syncIOConsumerFunc(void *param) {
taosGetQitem(qall, (void **)&pRpcMsg); taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) { /*
int msgSize = 32; if (pRpcMsg->handle != NULL) {
memset(&rpcMsg, 0, sizeof(rpcMsg)); int msgSize = 32;
rpcMsg.msgType = SYNC_RESPONSE; memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.msgType = SYNC_RESPONSE;
rpcMsg.contLen = msgSize; rpcMsg.pCont = rpcMallocCont(msgSize);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle; snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply");
rpcMsg.code = 0; rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg);
rpcSendResponse(&rpcMsg); syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg);
} rpcSendResponse(&rpcMsg);
}
*/
taosFreeQitem(pRpcMsg); taosFreeQitem(pRpcMsg);
} }
......
...@@ -111,7 +111,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { ...@@ -111,7 +111,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
walCommit(pWal, index); assert(walCommit(pWal, index) == 0);
return 0; // to avoid compiler error return 0; // to avoid compiler error
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册