提交 392564cc 编写于 作者: S Shengliang Guan

enh: refact syncEnv code

上级 8cdbcb89
...@@ -200,10 +200,10 @@ typedef struct SSyncInfo { ...@@ -200,10 +200,10 @@ typedef struct SSyncInfo {
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
bool syncIsInit();
int64_t syncOpen(SSyncInfo* pSyncInfo); int64_t syncOpen(SSyncInfo* pSyncInfo);
void syncStart(int64_t rid); void syncStart(int64_t rid);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid); bool syncIsReady(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid); const char* syncGetMyRoleStr(int64_t rid);
...@@ -216,7 +216,6 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet); ...@@ -216,7 +216,6 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); // int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
...@@ -234,6 +233,9 @@ int32_t syncEndSnapshot(int64_t rid); ...@@ -234,6 +233,9 @@ int32_t syncEndSnapshot(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm); int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,10 +28,6 @@ typedef struct SRaftId { ...@@ -28,10 +28,6 @@ typedef struct SRaftId {
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
// ------------------ control -------------------
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo);
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
......
...@@ -25,7 +25,8 @@ extern "C" { ...@@ -25,7 +25,8 @@ extern "C" {
// open a reference set, max is the mod used by hash, fp is the pointer to free resource function // open a reference set, max is the mod used by hash, fp is the pointer to free resource function
// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
int32_t taosOpenRef(int32_t max, void (*fp)(void *)); typedef void (*RefFp)(void *);
int32_t taosOpenRef(int32_t max, RefFp fp);
// close the reference set, refId is the return value by taosOpenRef // close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately
......
...@@ -481,7 +481,7 @@ int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) { ...@@ -481,7 +481,7 @@ int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) {
mInfo("vgId:%d, process sync ctrl msg", 1); mInfo("vgId:%d, process sync ctrl msg", 1);
if (!syncEnvIsStart()) { if (!syncIsInit()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
...@@ -518,7 +518,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -518,7 +518,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = 0; int32_t code = 0;
if (!syncEnvIsStart()) { if (!syncIsInit()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
...@@ -581,11 +581,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -581,11 +581,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg); syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
......
...@@ -234,7 +234,7 @@ int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -234,7 +234,7 @@ int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0; int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) { if (!syncIsInit()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return -1; return -1;
...@@ -277,7 +277,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -277,7 +277,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0; int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) { if (!syncIsInit()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return -1; return -1;
......
...@@ -20,13 +20,7 @@ ...@@ -20,13 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h"
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000 #define ENV_TICK_TIMER_MS 1000
...@@ -57,12 +51,12 @@ typedef struct SSyncEnv { ...@@ -57,12 +51,12 @@ typedef struct SSyncEnv {
} SSyncEnv; } SSyncEnv;
extern SSyncEnv* gSyncEnv; SSyncEnv* syncEnv();
int32_t syncEnvStart(); int64_t syncNodeAdd(SSyncNode* pNode);
int32_t syncEnvStop(); void syncNodeRemove(SSyncNode* pNode);
int32_t syncEnvStartTimer(); SSyncNode* syncNodeAcquire(int64_t rid);
int32_t syncEnvStopTimer(); void syncNodeRelease(SSyncNode* pNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,6 +24,8 @@ extern "C" { ...@@ -24,6 +24,8 @@ extern "C" {
#include "syncTools.h" #include "syncTools.h"
#include "tlog.h" #include "tlog.h"
#include "ttimer.h" #include "ttimer.h"
#include "taosdef.h"
#include "ttimer.h"
// clang-format off // clang-format off
#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
...@@ -255,9 +257,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, Sync ...@@ -255,9 +257,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, Sync
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
char* syncNodePeerState2Str(const SSyncNode* pSyncNode); char* syncNodePeerState2Str(const SSyncNode* pSyncNode);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
// raft state change -------------- // raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
...@@ -302,9 +301,6 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const ...@@ -302,9 +301,6 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid);
bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCanChange(SSyncNode* pSyncNode);
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
......
...@@ -13,118 +13,114 @@ ...@@ -13,118 +13,114 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "syncEnv.h" #include "syncEnv.h"
// #include <ASSERT.h> #include "tref.h"
SSyncEnv *gSyncEnv = NULL; static SSyncEnv gSyncEnv = {0};
static int32_t gNodeRefId = -1;
// local function ----------------- bool gRaftDetailLog = false;
static SSyncEnv *doSyncEnvStart();
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv);
static void syncEnvTick(void *param, void *tmrId); static void syncEnvTick(void *param, void *tmrId);
// --------------------------------
bool syncEnvIsStart() { SSyncEnv *syncEnv() { return &gSyncEnv; }
if (gSyncEnv == NULL) {
return false;
}
return atomic_load_8(&(gSyncEnv->isStart)); bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); }
}
int32_t syncInit() {
if (syncIsInit()) return 0;
int32_t syncEnvStart() {
int32_t ret = 0;
uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF); uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
taosSeedRand(seed); taosSeedRand(seed);
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv = doSyncEnvStart();
ASSERT(gSyncEnv != NULL);
sTrace("sync env start ok");
return ret;
}
int32_t syncEnvStop() { memset(&gSyncEnv, 0, sizeof(SSyncEnv));
int32_t ret = doSyncEnvStop(gSyncEnv); gSyncEnv.envTickTimerCounter = 0;
return ret; gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS;
} gSyncEnv.FpEnvTickTimer = syncEnvTick;
atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0);
atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0);
int32_t syncEnvStartTimer() { // start tmr thread
int32_t ret = doSyncEnvStartTimer(gSyncEnv); gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
return ret; atomic_store_8(&gSyncEnv.isStart, 1);
}
gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
if (gNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
return -1;
}
int32_t syncEnvStopTimer() { sDebug("sync rsetId:%d is open", gNodeRefId);
int32_t ret = doSyncEnvStopTimer(gSyncEnv); return 0;
return ret;
} }
// local function ----------------- void syncCleanUp() {
static void syncEnvTick(void *param, void *tmrId) { atomic_store_8(&gSyncEnv.isStart, 0);
SSyncEnv *pSyncEnv = (SSyncEnv *)param; taosTmrCleanUp(gSyncEnv.pTimerManager);
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { memset(&gSyncEnv, 0, sizeof(SSyncEnv));
++(pSyncEnv->envTickTimerCounter);
sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64
", "
"envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId);
// do something, tick ... if (gNodeRefId != -1) {
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); sDebug("sync rsetId:%d is closed", gNodeRefId);
} else { taosCloseRef(gNodeRefId);
sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 gNodeRefId = -1;
", envTickTimerCounter:%" PRIu64
", "
"envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId);
} }
} }
static SSyncEnv *doSyncEnvStart() { int64_t syncNodeAdd(SSyncNode *pNode) {
SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv)); pNode->rid = taosAddRef(gNodeRefId, pNode);
ASSERT(pSyncEnv != NULL); if (pNode->rid < 0) return -1;
memset(pSyncEnv, 0, sizeof(SSyncEnv));
pSyncEnv->envTickTimerCounter = 0;
pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS;
pSyncEnv->FpEnvTickTimer = syncEnvTick;
atomic_store_64(&pSyncEnv->envTickTimerLogicClock, 0);
atomic_store_64(&pSyncEnv->envTickTimerLogicClockUser, 0);
// start tmr thread sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); return 0;
}
atomic_store_8(&(pSyncEnv->isStart), 1); void syncNodeRemove(SSyncNode *pNode) {
return pSyncEnv; taosRemoveRef(gNodeRefId, pNode->rid);
sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
} }
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { SSyncNode *syncNodeAcquire(int64_t rid) {
ASSERT(pSyncEnv == gSyncEnv); SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
if (pSyncEnv != NULL) { if (pNode == NULL) {
atomic_store_8(&(pSyncEnv->isStart), 0); sTrace("failed to acquire node from refId:%" PRId64, rid);
taosTmrCleanUp(pSyncEnv->pTimerManager);
taosMemoryFree(pSyncEnv);
} }
gSyncEnv = NULL;
return 0; return pNode;
} }
static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); }
int32_t ret = 0;
taosTmrReset(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, #if 0
&pSyncEnv->pEnvTickTimer); void syncEnvStartTimer() {
atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser); taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager,
return ret; &gSyncEnv.pEnvTickTimer);
atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser);
} }
static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { void syncEnvStopTimer() {
int32_t ret = 0; int32_t ret = 0;
atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1);
taosTmrStop(pSyncEnv->pEnvTickTimer); taosTmrStop(gSyncEnv.pEnvTickTimer);
pSyncEnv->pEnvTickTimer = NULL; gSyncEnv.pEnvTickTimer = NULL;
return ret; return ret;
} }
#endif
static void syncEnvTick(void *param, void *tmrId) {
SSyncEnv *pSyncEnv = param;
if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) {
gSyncEnv.envTickTimerCounter++;
sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
// do something, tick ...
taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer);
} else {
sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
}
}
此差异已折叠。
...@@ -98,7 +98,7 @@ int main(int argc, char** argv) { ...@@ -98,7 +98,7 @@ int main(int argc, char** argv) {
init(); init();
int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
char walPath[128]; char walPath[128];
......
...@@ -152,7 +152,7 @@ int main(int argc, char **argv) { ...@@ -152,7 +152,7 @@ int main(int argc, char **argv) {
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
taosRemoveDir("./wal_test"); taosRemoveDir("./wal_test");
......
...@@ -81,7 +81,7 @@ int main(int argc, char** argv) { ...@@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
......
...@@ -22,7 +22,7 @@ int main() { ...@@ -22,7 +22,7 @@ int main() {
logTest(); logTest();
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
...@@ -37,8 +37,6 @@ int main() { ...@@ -37,8 +37,6 @@ int main() {
taosMsleep(5000); taosMsleep(5000);
} }
ret = syncEnvStop(); syncCleanUp();
assert(ret == 0);
return 0; return 0;
} }
...@@ -82,7 +82,7 @@ int main(int argc, char** argv) { ...@@ -82,7 +82,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
......
...@@ -81,7 +81,7 @@ int main(int argc, char** argv) { ...@@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
...@@ -91,7 +91,7 @@ int main(int argc, char** argv) { ...@@ -91,7 +91,7 @@ int main(int argc, char** argv) {
initRaftId(pSyncNode); initRaftId(pSyncNode);
syncNodeClose(pSyncNode); syncNodeClose(pSyncNode);
syncEnvStop(); syncCleanUp();
// syncIOStop(); // syncIOStop();
// taosCloseLog(); // taosCloseLog();
......
...@@ -81,7 +81,7 @@ int main(int argc, char** argv) { ...@@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
......
...@@ -81,7 +81,7 @@ int main(int argc, char** argv) { ...@@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
......
...@@ -81,7 +81,7 @@ int main(int argc, char** argv) { ...@@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
......
...@@ -179,7 +179,7 @@ int main(int argc, char **argv) { ...@@ -179,7 +179,7 @@ int main(int argc, char **argv) {
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
// taosRemoveDir(pWalDir); // taosRemoveDir(pWalDir);
......
...@@ -82,7 +82,7 @@ int main(int argc, char** argv) { ...@@ -82,7 +82,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
......
...@@ -82,7 +82,7 @@ int main(int argc, char** argv) { ...@@ -82,7 +82,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
......
...@@ -154,7 +154,7 @@ int main(int argc, char **argv) { ...@@ -154,7 +154,7 @@ int main(int argc, char **argv) {
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
taosRemoveDir("./wal_test"); taosRemoveDir("./wal_test");
......
...@@ -57,7 +57,7 @@ static void taosIncRsetCount(SRefSet *pSet); ...@@ -57,7 +57,7 @@ static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet);
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove); static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove);
int32_t taosOpenRef(int32_t max, void (*fp)(void *)) { int32_t taosOpenRef(int32_t max, RefFp fp) {
SRefNode **nodeList; SRefNode **nodeList;
SRefSet *pSet; SRefSet *pSet;
int64_t *lockedBy; int64_t *lockedBy;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册