提交 0e0af1de 编写于 作者: M Minghao Li

add sync code

上级 3d9d5240
...@@ -20,11 +20,38 @@ ...@@ -20,11 +20,38 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h> #include "os.h"
#include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
#include "tqueue.h"
#include "trpc.h"
typedef struct SSyncIO {
void * serverRpc;
void * clientRpc;
STaosQueue *pMsgQ;
STaosQset * pQset;
pthread_t tid;
int8_t isStart;
int32_t (*start)(struct SSyncIO *ths);
int32_t (*stop)(struct SSyncIO *ths);
int32_t (*ping)(struct SSyncIO *ths);
int32_t (*onMessage)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t (*destroy)(struct SSyncIO *ths);
} SSyncIO;
SSyncIO * syncIOCreate();
static int32_t syncIOStart(SSyncIO *io);
static int32_t syncIOStop(SSyncIO *io);
static int32_t syncIOPing(SSyncIO *io);
static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIODestroy(SSyncIO *io);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,16 +23,14 @@ extern "C" { ...@@ -23,16 +23,14 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "taosdef.h"
#include "sync.h" #include "sync.h"
#include "taosdef.h"
#include "tlog.h" #include "tlog.h"
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
#define sLog(...) \ #define sLog(...) \
{ \ { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }
taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); \
}
#define sFatal(...) \ #define sFatal(...) \
{ \ { \
......
...@@ -17,101 +17,96 @@ ...@@ -17,101 +17,96 @@
#include "sync.h" #include "sync.h"
void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) {
// TLA+ Spec
// TLA+ Spec // AppendEntries(i, j) ==
//AppendEntries(i, j) == // /\ i /= j
// /\ i /= j // /\ state[i] = Leader
// /\ state[i] = Leader // /\ LET prevLogIndex == nextIndex[i][j] - 1
// /\ LET prevLogIndex == nextIndex[i][j] - 1 // prevLogTerm == IF prevLogIndex > 0 THEN
// prevLogTerm == IF prevLogIndex > 0 THEN // log[i][prevLogIndex].term
// log[i][prevLogIndex].term // ELSE
// ELSE // 0
// 0 // \* Send up to 1 entry, constrained by the end of the log.
// \* Send up to 1 entry, constrained by the end of the log. // lastEntry == Min({Len(log[i]), nextIndex[i][j]})
// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) // entries == SubSeq(log[i], nextIndex[i][j], lastEntry)
// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) // IN Send([mtype |-> AppendEntriesRequest,
// IN Send([mtype |-> AppendEntriesRequest, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // mprevLogIndex |-> prevLogIndex,
// mprevLogIndex |-> prevLogIndex, // mprevLogTerm |-> prevLogTerm,
// mprevLogTerm |-> prevLogTerm, // mentries |-> entries,
// mentries |-> entries, // \* mlog is used as a history variable for the proof.
// \* mlog is used as a history variable for the proof. // \* It would not exist in a real implementation.
// \* It would not exist in a real implementation. // mlog |-> log[i],
// mlog |-> log[i], // mcommitIndex |-> Min({commitIndex[i], lastEntry}),
// mcommitIndex |-> Min({commitIndex[i], lastEntry}), // msource |-> i,
// msource |-> i, // mdest |-> j])
// mdest |-> j]) // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleAppendEntriesRequest(i, j, m) ==
//HandleAppendEntriesRequest(i, j, m) == // LET logOk == \/ m.mprevLogIndex = 0
// LET logOk == \/ m.mprevLogIndex = 0 // \/ /\ m.mprevLogIndex > 0
// \/ /\ m.mprevLogIndex > 0 // /\ m.mprevLogIndex <= Len(log[i])
// /\ m.mprevLogIndex <= Len(log[i]) // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term // IN /\ m.mterm <= currentTerm[i]
// IN /\ m.mterm <= currentTerm[i] // /\ \/ /\ \* reject request
// /\ \/ /\ \* reject request // \/ m.mterm < currentTerm[i]
// \/ m.mterm < currentTerm[i] // \/ /\ m.mterm = currentTerm[i]
// \/ /\ m.mterm = currentTerm[i] // /\ state[i] = Follower
// /\ state[i] = Follower // /\ \lnot logOk
// /\ \lnot logOk // /\ Reply([mtype |-> AppendEntriesResponse,
// /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // msuccess |-> FALSE,
// msuccess |-> FALSE, // mmatchIndex |-> 0,
// mmatchIndex |-> 0, // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<serverVars, logVars>>
// /\ UNCHANGED <<serverVars, logVars>> // \/ \* return to follower state
// \/ \* return to follower state // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ state[i] = Candidate
// /\ state[i] = Candidate // /\ state' = [state EXCEPT ![i] = Follower]
// /\ state' = [state EXCEPT ![i] = Follower] // /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
// /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>> // \/ \* accept request
// \/ \* accept request // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ state[i] = Follower
// /\ state[i] = Follower // /\ logOk
// /\ logOk // /\ LET index == m.mprevLogIndex + 1
// /\ LET index == m.mprevLogIndex + 1 // IN \/ \* already done with request
// IN \/ \* already done with request // /\ \/ m.mentries = << >>
// /\ \/ m.mentries = << >> // \/ /\ m.mentries /= << >>
// \/ /\ m.mentries /= << >> // /\ Len(log[i]) >= index
// /\ Len(log[i]) >= index // /\ log[i][index].term = m.mentries[1].term
// /\ log[i][index].term = m.mentries[1].term // \* This could make our commitIndex decrease (for
// \* This could make our commitIndex decrease (for // \* example if we process an old, duplicated request),
// \* example if we process an old, duplicated request), // \* but that doesn't really affect anything.
// \* but that doesn't really affect anything. // /\ commitIndex' = [commitIndex EXCEPT ![i] =
// /\ commitIndex' = [commitIndex EXCEPT ![i] = // m.mcommitIndex]
// m.mcommitIndex] // /\ Reply([mtype |-> AppendEntriesResponse,
// /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // msuccess |-> TRUE,
// msuccess |-> TRUE, // mmatchIndex |-> m.mprevLogIndex +
// mmatchIndex |-> m.mprevLogIndex + // Len(m.mentries),
// Len(m.mentries), // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<serverVars, log>>
// /\ UNCHANGED <<serverVars, log>> // \/ \* conflict: remove 1 entry
// \/ \* conflict: remove 1 entry // /\ m.mentries /= << >>
// /\ m.mentries /= << >> // /\ Len(log[i]) >= index
// /\ Len(log[i]) >= index // /\ log[i][index].term /= m.mentries[1].term
// /\ log[i][index].term /= m.mentries[1].term // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> // log[i][index2]]
// log[i][index2]] // IN log' = [log EXCEPT ![i] = new]
// IN log' = [log EXCEPT ![i] = new] // /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<serverVars, commitIndex, messages>> // \/ \* no conflict: append entry
// \/ \* no conflict: append entry // /\ m.mentries /= << >>
// /\ m.mentries /= << >> // /\ Len(log[i]) = m.mprevLogIndex
// /\ Len(log[i]) = m.mprevLogIndex // /\ log' = [log EXCEPT ![i] =
// /\ log' = [log EXCEPT ![i] = // Append(log[i], m.mentries[1])]
// Append(log[i], m.mentries[1])] // /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<serverVars, commitIndex, messages>> // /\ UNCHANGED <<candidateVars, leaderVars>>
// /\ UNCHANGED <<candidateVars, leaderVars>> //
//
} }
...@@ -17,18 +17,16 @@ ...@@ -17,18 +17,16 @@
#include "sync.h" #include "sync.h"
void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleAppendEntriesResponse(i, j, m) ==
//HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ \/ /\ m.msuccess \* successful
// /\ \/ /\ m.msuccess \* successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] // \/ /\ \lnot m.msuccess \* not successful
// \/ /\ \lnot m.msuccess \* not successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = // Max({nextIndex[i][j] - 1, 1})]
// Max({nextIndex[i][j] - 1, 1})] // /\ UNCHANGED <<matchIndex>>
// /\ UNCHANGED <<matchIndex>> // /\ Discard(m)
// /\ Discard(m) // /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
} }
...@@ -13,4 +13,191 @@ ...@@ -13,4 +13,191 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncIO.h"
#include "syncOnMessage.h"
void *syncConsumer(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
sDebug("%d sync-io msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
sDebug("sync-io recv msg: %s", (char *)(pRpcMsg->pCont));
}
taosResetQitems(qall);
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
/*
int msgSize = 128;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
*/
taosFreeQitem(pRpcMsg);
}
}
taosFreeQall(qall);
}
static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
return ret;
}
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
/*
// SInfo *pInfo = (SInfo *)pMsg->ahandle;
sDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
// tsem_post(&pInfo->rspSem);
tsem_post(&pInfo->rspSem);
*/
}
static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SSyncIO *io = pParent;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(io->pMsgQ, pTemp);
}
SSyncIO *syncIOCreate() {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
io->start = syncIOStart;
io->stop = syncIOStop;
io->ping = syncIOPing;
io->onMessage = syncIOOnMessage;
io->destroy = syncIODestroy;
return io;
}
static int32_t syncIOStart(SSyncIO *io) {
taosBlockSIGPIPE();
// cient rpc init
{
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "SYNC-IO-CLIENT";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "sync-io";
rpcInit.secret = "sync-io";
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
io->clientRpc = rpcOpen(&rpcInit);
if (io->clientRpc == NULL) {
sError("failed to initialize RPC");
return -1;
}
}
// server rpc init
{
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 38000;
rpcInit.label = "SYNC-IO-SERVER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo;
rpcInit.parent = io;
rpcInit.connType = TAOS_CONN_SERVER;
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
sError("failed to start RPC server");
return -1;
}
}
// start consumer thread
{
if (pthread_create(&io->tid, NULL, syncConsumer, NULL) != 0) {
sError("failed to create sync consumer thread since %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
return 0;
}
static int32_t syncIOStop(SSyncIO *io) {
atomic_store_8(&io->isStart, 0);
pthread_join(io->tid, NULL);
return 0;
}
static int32_t syncIOPing(SSyncIO *io) { return 0; }
static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; }
static int32_t syncIODestroy(SSyncIO *io) {
int8_t start = atomic_load_8(&io->isStart);
assert(start == 0);
if (io->serverRpc != NULL) {
free(io->serverRpc);
io->serverRpc = NULL;
}
if (io->clientRpc != NULL) {
free(io->clientRpc);
io->clientRpc = NULL;
}
if (io->pMsgQ != NULL) {
free(io->pMsgQ);
io->pMsgQ = NULL;
}
if (io->pQset != NULL) {
free(io->pQset);
io->pQset = NULL;
}
return 0;
}
\ No newline at end of file
...@@ -17,43 +17,39 @@ ...@@ -17,43 +17,39 @@
#include "sync.h" #include "sync.h"
void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) {
// TLA+ Spec
// TLA+ Spec // RequestVote(i, j) ==
//RequestVote(i, j) == // /\ state[i] = Candidate
// /\ state[i] = Candidate // /\ j \notin votesResponded[i]
// /\ j \notin votesResponded[i] // /\ Send([mtype |-> RequestVoteRequest,
// /\ Send([mtype |-> RequestVoteRequest, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // mlastLogTerm |-> LastTerm(log[i]),
// mlastLogTerm |-> LastTerm(log[i]), // mlastLogIndex |-> Len(log[i]),
// mlastLogIndex |-> Len(log[i]), // msource |-> i,
// msource |-> i, // mdest |-> j])
// mdest |-> j]) // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleRequestVoteRequest(i, j, m) ==
//HandleRequestVoteRequest(i, j, m) == // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) // \/ /\ m.mlastLogTerm = LastTerm(log[i])
// \/ /\ m.mlastLogTerm = LastTerm(log[i]) // /\ m.mlastLogIndex >= Len(log[i])
// /\ m.mlastLogIndex >= Len(log[i]) // grant == /\ m.mterm = currentTerm[i]
// grant == /\ m.mterm = currentTerm[i] // /\ logOk
// /\ logOk // /\ votedFor[i] \in {Nil, j}
// /\ votedFor[i] \in {Nil, j} // IN /\ m.mterm <= currentTerm[i]
// IN /\ m.mterm <= currentTerm[i] // /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j]
// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] // \/ ~grant /\ UNCHANGED votedFor
// \/ ~grant /\ UNCHANGED votedFor // /\ Reply([mtype |-> RequestVoteResponse,
// /\ Reply([mtype |-> RequestVoteResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // mvoteGranted |-> grant,
// mvoteGranted |-> grant, // \* mlog is used just for the `elections' history variable for
// \* mlog is used just for the `elections' history variable for // \* the proof. It would not exist in a real implementation.
// \* the proof. It would not exist in a real implementation. // mlog |-> log[i],
// mlog |-> log[i], // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
} }
...@@ -17,22 +17,20 @@ ...@@ -17,22 +17,20 @@
#include "sync.h" #include "sync.h"
void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleRequestVoteResponse(i, j, m) ==
//HandleRequestVoteResponse(i, j, m) == // \* This tallies votes even when the current state is not Candidate, but
// \* This tallies votes even when the current state is not Candidate, but // \* they won't be looked at, so it doesn't matter.
// \* they won't be looked at, so it doesn't matter. // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ votesResponded' = [votesResponded EXCEPT ![i] =
// /\ votesResponded' = [votesResponded EXCEPT ![i] = // votesResponded[i] \cup {j}]
// votesResponded[i] \cup {j}] // /\ \/ /\ m.mvoteGranted
// /\ \/ /\ m.mvoteGranted // /\ votesGranted' = [votesGranted EXCEPT ![i] =
// /\ votesGranted' = [votesGranted EXCEPT ![i] = // votesGranted[i] \cup {j}]
// votesGranted[i] \cup {j}] // /\ voterLog' = [voterLog EXCEPT ![i] =
// /\ voterLog' = [voterLog EXCEPT ![i] = // voterLog[i] @@ (j :> m.mlog)]
// voterLog[i] @@ (j :> m.mlog)] // \/ /\ ~m.mvoteGranted
// \/ /\ ~m.mvoteGranted // /\ UNCHANGED <<votesGranted, voterLog>>
// /\ UNCHANGED <<votesGranted, voterLog>> // /\ Discard(m)
// /\ Discard(m) // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
} }
#include <stdio.h> #include <stdio.h>
#include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
int main() { int main() {
printf("test \n"); tsAsyncLog = 0;
taosInitLog((char*)"syncTest.log", 100000, 10);
syncStartEnv(); sDebug("sync test");
syncStartEnv();
char temp[100]; SSyncIO *syncIO = syncIOCreate();
snprintf(temp, 100, "./debug.log"); assert(syncIO != NULL);
taosInitLog(temp, 10000, 1);
tsAsyncLog = 0;
for (int i = 0; i < 100; i++) { while (1) {
sDebug("log:%d -------- \n", i); sleep(3);
} }
return 0;
fflush(NULL);
//taosCloseLog();
while(1) {
sleep(3);
}
return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册