提交 7815a5e9 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/config

......@@ -51,6 +51,10 @@ typedef enum {
} ECheckItemType;
typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig;
typedef enum {
TSDB_STATIS_OK = 0, // statis part exist and load successfully
TSDB_STATIS_NONE = 1, // statis part not exist
} ETsdbStatisStatus;
extern char *qtypeStr[];
......
......@@ -34,7 +34,9 @@ typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 0,
TAOS_SYNC_STATE_CANDIDATE = 1,
TAOS_SYNC_STATE_LEADER = 2,
} ESyncState;
} ESyncRole;
typedef ESyncRole ESyncState;
typedef struct SSyncBuffer {
void* data;
......@@ -69,15 +71,15 @@ typedef struct SSyncFSM {
// when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
// when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
// when log entry is updated by a new one, FpRollBackCb is called
// user can do something to roll back. for example, delete data from tsdb, or just ignore it
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
// user should implement this function, use "data" to take snapshot into "snapshot"
int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
......@@ -93,10 +95,10 @@ typedef struct SSyncLogStore {
void* data;
// append one log entry
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf);
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf);
// get one log entry, user need to free pBuf->data
int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf);
int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf);
// update log store commit index with "index"
int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
......@@ -135,7 +137,9 @@ typedef struct SSyncInfo {
SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm;
int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncInfo;
......@@ -149,8 +153,8 @@ int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
......
......@@ -27,12 +27,12 @@ extern "C" {
typedef struct SDataStatis {
int16_t colId;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SDataStatis;
typedef struct STable {
......@@ -53,6 +53,8 @@ typedef struct STsdb STsdb;
typedef struct STsdbCfg {
int8_t precision;
int8_t update;
int8_t compression;
uint64_t lruCacheSize;
int32_t daysPerFile;
int32_t minRowsPerFileBlock;
......@@ -60,8 +62,6 @@ typedef struct STsdbCfg {
int32_t keep;
int32_t keep1;
int32_t keep2;
int8_t update;
int8_t compression;
} STsdbCfg;
// query condition to build multi-table data block iterator
......
......@@ -18,8 +18,6 @@
#include "tsdbFile.h"
#define TSDB_FS_VERSION 0
// ================== TSDB global config
extern bool tsdbForceKeepFile;
......
......@@ -44,7 +44,37 @@
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
typedef enum {
TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data
TSDB_FILE_LAST, // .last
TSDB_FILE_SMAD, // .smad(Block-wise SMA)
TSDB_FILE_SMAL, // .smal(Block-wise SMA)
TSDB_FILE_MAX, //
TSDB_FILE_META, // meta
TSDB_FILE_TSMA, // .tsma.${sma_index_name}, Time-range-wise SMA
TSDB_FILE_RSMA, // .rsma.${sma_index_name}, Time-range-wise Rollup SMA
} TSDB_FILE_T;
typedef enum {
TSDB_FS_VER_0 = 0,
TSDB_FS_VER_MAX,
} ETsdbFsVer;
#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
switch (fType) {
case TSDB_FILE_HEAD: // .head
case TSDB_FILE_DATA: // .data
case TSDB_FILE_LAST: // .last
case TSDB_FILE_SMAD: // .smad(Block-wise SMA)
case TSDB_FILE_SMAL: // .smal(Block-wise SMA)
default:
return TSDB_LATEST_FVER;
}
}
#if 0
// =============== SMFile
......@@ -169,6 +199,7 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby
// =============== SDFile
typedef struct {
uint32_t magic;
uint32_t fver;
uint32_t len;
uint32_t totalBlocks;
uint32_t totalSubBlocks;
......@@ -188,7 +219,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
......@@ -292,12 +323,18 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet
typedef struct {
int fid;
int state;
SDFile files[TSDB_FILE_MAX];
int fid;
int8_t state; // -128~127
uint8_t ver; // 0~255, DFileSet version
uint16_t reserve;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
#define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_FSET_STATE(s) ((s)->state)
#define TSDB_FSET_VER(s) ((s)->ver)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
......
......@@ -36,6 +36,7 @@ typedef struct {
TSKEY maxKey;
} SBlockIdx;
#ifdef TD_REFACTOR_3
typedef struct {
int64_t last : 1;
int64_t offset : 63;
......@@ -49,22 +50,35 @@ typedef struct {
TSKEY keyLast;
} SBlock;
#else
typedef enum {
TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_MAX,
} ESBlockVer;
#define SBlockVerLatest TSDB_SBLK_VER_0
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
uint8_t reserve0;
uint8_t last : 1;
uint8_t blkVer : 7;
uint8_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
uint32_t len : 32; // data block length
uint32_t len; // data block length
uint32_t keyLen : 24; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint32_t reserve1 : 8;
uint64_t blkVer : 8;
uint64_t aggrOffset : 56;
uint32_t reserve : 8;
int32_t algorithm : 8;
int32_t numOfRows : 24;
int64_t offset;
uint64_t aggrStat : 1;
uint64_t aggrOffset : 63;
TSKEY keyFirst;
TSKEY keyLast;
} SBlock_3;
} SBlockV0;
#define SBlock SBlockV0 // latest SBlock definition
#endif
typedef struct {
int32_t delimiter; // For recovery usage
......@@ -73,6 +87,7 @@ typedef struct {
SBlock blocks[];
} SBlockInfo;
#ifdef TD_REFACTOR_3
typedef struct {
int16_t colId;
uint16_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
......@@ -89,17 +104,50 @@ typedef struct {
uint8_t offsetH;
char padding[1];
} SBlockCol;
#else
typedef struct {
int16_t colId;
uint8_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint8_t reserve : 7;
uint8_t type;
int32_t len;
uint32_t offset;
} SBlockColV0;
#define SBlockCol SBlockColV0 // latest SBlockCol definition
typedef struct {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SAggrBlkColV0;
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
#endif
// Code here just for back-ward compatibility
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
#ifdef TD_REFACTOR_3
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
pBlockCol->offsetH = (uint8_t)(offset >> 24);
#else
pBlockCol->offset = offset;
#endif
}
static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
#ifdef TD_REFACTOR_3
uint32_t offset1 = pBlockCol->offset;
uint32_t offset2 = pBlockCol->offsetH;
return (offset1 | (offset2 << 24));
#else
return pBlockCol->offset;
#endif
}
typedef struct {
......@@ -109,31 +157,57 @@ typedef struct {
SBlockCol cols[];
} SBlockData;
typedef void SAggrBlkData; // SBlockCol cols[];
struct SReadH {
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo *pBlkInfo;
SBlockData *pBlkData; // Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo * pBlkInfo;
SBlockData * pBlkData; // Block info
SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
default:
return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
}
}
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
default:
return TSDB_BLOCK_AGGR_SIZE(nCols, 0);
}
}
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
......@@ -147,7 +221,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
void * pBuf = *ppBuf;
......
......@@ -50,8 +50,11 @@ typedef struct {
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
......@@ -509,7 +512,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
......@@ -560,7 +563,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
......@@ -572,6 +575,74 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
}
// TSDB_FILE_SMAD
SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh));
SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith);
if (access(TSDB_FILE_FULL_NAME(pRSmadF), F_OK) != 0) {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF));
tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD);
if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbInitDFileEx(pWSmadF, pRSmadF);
if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
// TSDB_FILE_SMAL
SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);
if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) {
tsdbInitDFileEx(pWSmalF, pRSmalF);
if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF));
tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);
if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
}
return 0;
......@@ -1131,41 +1202,57 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper, void **ppBuf, void **ppCBuf) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData;
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData * pBlockData = NULL;
SAggrBlkData *pAggrBlkData = NULL;
int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
// Make buffer space
if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pBlockData = (SBlockData *)(*ppBuf);
if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pAggrBlkData = (SAggrBlkData *)(*ppExBuf);
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull;
SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pBlockCol, 0, sizeof(*pBlockCol));
memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol));
pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type;
pAggrBlkCol->colId = pDataCol->colId;
if (tDataTypes[pDataCol->type].statisFunc) {
#if 0
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
if (pBlockCol->numOfNull == 0) {
#endif
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max),
&(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
&(pAggrBlkCol->numOfNull));
if (pAggrBlkCol->numOfNull == 0) {
TD_SET_COL_ROWS_NORM(pBlockCol);
} else {
TD_SET_COL_ROWS_MISC(pBlockCol);
......@@ -1181,13 +1268,14 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Compress the data if neccessary
int tcol = 0; // counter of not all NULL and written columns
uint32_t toffset = 0;
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull);
int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest);
int32_t lsize = tsize;
uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsNotAllNull, SBlockVerLatest);
int32_t keyLen = 0;
int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite);
int32_t tBitmaps = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) {
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break;
......@@ -1248,7 +1336,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
if (ncol != 0) {
tsdbSetBlockColOffset(pBlockCol, toffset);
pBlockCol->len = flen;
tcol++;
++tcol;
} else {
keyLen = flen;
}
......@@ -1269,6 +1357,18 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
return -1;
}
uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0;
if (aggrStatus > 0) {
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
// Write the whole block to file
if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) {
return -1;
}
}
// Update pBlock membership vairables
pBlock->last = isLast;
pBlock->offset = offset;
......@@ -1280,6 +1380,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
pBlock->aggrStat = aggrStatus;
pBlock->blkVer = SBlockVerLatest;
pBlock->aggrOffset = (uint64_t)offsetAggr;
tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
......@@ -1291,9 +1394,10 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile,
isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols,
pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
......
......@@ -38,6 +38,8 @@ typedef struct {
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
......
......@@ -422,7 +422,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
return -1;
}
fsheader.version = TSDB_FS_VERSION;
fsheader.version = TSDB_LATEST_SFS_VER;
if (taosArrayGetSize(pStatus->df) == 0) {
fsheader.len = 0;
} else {
......@@ -697,7 +697,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
ptr = tsdbDecodeFSHeader(ptr, &fsheader);
ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
if (fsheader.version != TSDB_FS_VERSION) {
if (fsheader.version != TSDB_LATEST_SFS_VER) {
// TODO: handle file version change
}
......
......@@ -19,8 +19,12 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA
"last", // TSDB_FILE_LAST
"smad", // TSDB_FILE_SMAD
"smal", // TSDB_FILE_SMAL
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
"tsma", // TSDB_FILE_TSMA
"rsma", // TSDB_FILE_RSMA
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
......@@ -304,6 +308,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname);
tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname);
......@@ -341,7 +346,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
}
static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
char *aname;
char *aname = NULL;
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
buf = taosDecodeString(buf, &aname);
......@@ -352,7 +357,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return buf;
}
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
......@@ -382,6 +387,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
}
pDFile->info.size += TSDB_FILE_HEAD_SIZE;
pDFile->info.fver = tsdbGetDFSVersion(fType);
if (tsdbUpdateDFileHeader(pDFile) < 0) {
tsdbCloseDFile(pDFile);
......@@ -493,6 +499,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
......@@ -505,6 +512,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
......@@ -562,8 +570,10 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
// ============== Operations on SDFileSet
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) {
pSet->fid = fid;
pSet->state = 0;
TSDB_FSET_FID(pSet) = fid;
TSDB_FSET_VER(pSet) = TSDB_LATEST_FSET_VER;
TSDB_FSET_STATE(pSet) = 0;
pSet->reserve = 0;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
......@@ -572,7 +582,7 @@ void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint3
}
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
pSet->fid = pOSet->fid;
TSDB_FSET_FID(pSet) = TSDB_FSET_FID(pOSet);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype));
}
......@@ -581,7 +591,10 @@ void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
// state not included
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
tlen += taosEncodeFixedU16(buf, pSet->reserve);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -590,11 +603,11 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
}
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
TSDB_FSET_STATE(pSet) = 0;
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
buf = taosDecodeFixedI32(buf, &(fid));
pSet->state = 0;
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -604,7 +617,9 @@ void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
tlen += taosEncodeFixedU16(buf, pSet->reserve);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -613,10 +628,10 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
}
void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
buf = taosDecodeFixedI32(buf, &(fid));
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -637,7 +652,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) {
tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet);
return -1;
......
......@@ -821,9 +821,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) {
tsdbLoadBlockStatis(pReadh, pBlock);
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
loadStatisData = true;
if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) {
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true;
}
}
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
......
......@@ -3276,8 +3276,12 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
}
int64_t stime = taosGetTimestampUs();
if (tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock) < 0) {
int statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
if (statisStatus < TSDB_STATIS_OK) {
return terrno;
} else if (statisStatus > TSDB_STATIS_OK) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
int16_t* colIds = pHandle->defaultLoadColumn->pData;
......@@ -3288,7 +3292,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
pHandle->statis[i].colId = colIds[i];
}
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
// always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
......
......@@ -19,6 +19,7 @@
static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh);
static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int numOfBitmaps, int lenOfBitmaps, int maxPoints, char *buffer,
......@@ -63,10 +64,12 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
void tsdbDestroyReadH(SReadH *pReadh) {
if (pReadh == NULL) return;
pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
pReadh->pBuf = taosTZfree(pReadh->pBuf);
pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
pReadh->pAggrBlkData = taosTZfree(pReadh->pAggrBlkData);
pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo);
pReadh->cidx = 0;
......@@ -305,15 +308,58 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (!pBlock->aggrStat) {
return TSDB_STATIS_NONE;
}
SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRIu64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset,
tstrerror(terrno));
return -1;
}
size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRIu64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
return 0;
}
static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
size_t size = tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
......@@ -337,7 +383,6 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
}
return 0;
}
......@@ -375,13 +420,14 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
return buf;
}
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock) {
#ifdef TD_REFACTOR_3
SBlockData *pBlockData = pReadh->pBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlockData->numOfCols) {
pStatis[i].numOfNull = -1;
i++;
++i;
continue;
}
......@@ -392,15 +438,45 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
pStatis[i].minIndex = pBlockData->cols[j].minIndex;
pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
i++;
j++;
++i;
++j;
} else if (pStatis[i].colId < pBlockData->cols[j].colId) {
pStatis[i].numOfNull = -1;
i++;
++i;
} else {
j++;
++j;
}
}
#else
if (pBlock->aggrStat) {
SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlock->numOfCols) {
pStatis[i].numOfNull = -1;
++i;
continue;
}
SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData)) + j;
if (pStatis[i].colId == pAggrBlkCol->colId) {
pStatis[i].sum = pAggrBlkCol->sum;
pStatis[i].max = pAggrBlkCol->max;
pStatis[i].min = pAggrBlkCol->min;
pStatis[i].maxIndex = pAggrBlkCol->maxIndex;
pStatis[i].minIndex = pAggrBlkCol->minIndex;
pStatis[i].numOfNull = pAggrBlkCol->numOfNull;
++i;
++j;
} else if (pStatis[i].colId < pAggrBlkCol->colId) {
pStatis[i].numOfNull = -1;
++i;
} else {
++j;
}
}
}
#endif
}
static void tsdbResetReadTable(SReadH *pReadh) {
......@@ -449,7 +525,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return -1;
}
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
......@@ -592,7 +668,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols);
// If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadBlockStatis(pReadh, pBlock) < 0) return -1;
if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
pDataCols->numOfRows = pBlock->numOfRows;
......@@ -686,7 +762,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + tsdbGetBlockColOffset(pBlockCol);
int64_t offset = pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) +
tsdbGetBlockColOffset(pBlockCol);
if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
......
......@@ -26,19 +26,26 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
typedef struct SSyncEnv {
void *pTimer;
void *pTimerManager;
tmr_h pEnvTickTimer;
tmr_h pTimerManager;
char name[128];
} SSyncEnv;
extern SSyncEnv* gSyncEnv;
int32_t syncEnvStart();
int32_t syncEnvStop();
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv);
tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param);
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv);
void syncEnvStopTimer(tmr_h* pTimer);
#ifdef __cplusplus
}
......
......@@ -30,42 +30,37 @@ extern "C" {
#include "trpc.h"
typedef struct SSyncIO {
void * serverRpc;
void * clientRpc;
STaosQueue *pMsgQ;
STaosQset * pQset;
pthread_t tid;
int8_t isStart;
pthread_t consumerTid;
SEpSet epSet;
void * serverRpc;
void * clientRpc;
SEpSet myAddr;
void *syncTimer;
void *syncTimerManager;
int32_t (*start)(struct SSyncIO *ths);
int32_t (*stop)(struct SSyncIO *ths);
int32_t (*ping)(struct SSyncIO *ths);
int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t (*destroy)(struct SSyncIO *ths);
void *ioTimerTickQ;
void *ioTimerTickPing;
void *ioTimerManager;
void *pSyncNode;
int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg);
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg);
int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg);
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg);
int8_t isStart;
} SSyncIO;
extern SSyncIO *gSyncIO;
int32_t syncIOStart();
int32_t syncIOStop();
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg);
SSyncIO *syncIOCreate();
static int32_t doSyncIOStart(SSyncIO *io);
static int32_t doSyncIOStop(SSyncIO *io);
static int32_t doSyncIOPing(SSyncIO *io);
static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t doSyncIODestroy(SSyncIO *io);
int32_t syncIOStart(char *host, uint16_t port);
int32_t syncIOStop();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOTickQ();
int32_t syncIOTickPing();
#ifdef __cplusplus
}
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "sync.h"
#include "taosdef.h"
#include "tlog.h"
#include "ttimer.h"
extern int32_t sDebugFlag;
......@@ -47,23 +48,23 @@ extern int32_t sDebugFlag;
taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); \
} \
}
#define sInfo(...) \
{ \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
} \
#define sInfo(...) \
{ \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("SYN INFO ", sDebugFlag, __VA_ARGS__); \
} \
}
#define sDebug(...) \
{ \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
} \
#define sDebug(...) \
{ \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("SYN DEBUG ", sDebugFlag, __VA_ARGS__); \
} \
}
#define sTrace(...) \
{ \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
} \
#define sTrace(...) \
{ \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("SYN TRACE ", sDebugFlag, __VA_ARGS__); \
} \
}
struct SRaft;
......@@ -87,35 +88,64 @@ typedef struct SyncAppendEntries SyncAppendEntries;
struct SyncAppendEntriesReply;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
typedef struct SSyncNode {
int8_t replica;
int8_t quorum;
struct SSyncEnv;
typedef struct SSyncEnv SSyncEnv;
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId;
typedef struct SSyncNode {
SyncGroupId vgId;
SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm;
// passed from outside
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
SRaft* pRaft;
int32_t refCount;
int64_t rid;
int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg);
SNodeInfo me;
SNodeInfo peers[TSDB_MAX_REPLICA];
int32_t peersNum;
int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg);
ESyncRole role;
SRaftId raftId;
int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg);
tmr_h pPingTimer;
int32_t pingTimerMS;
uint8_t pingTimerStart;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
uint64_t pingTimerCounter;
int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg);
tmr_h pElectTimer;
int32_t electTimerMS;
uint8_t electTimerStart;
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
uint64_t electTimerCounter;
int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg);
tmr_h pHeartbeatTimer;
int32_t heartbeatTimerMS;
uint8_t heartbeatTimerStart;
TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp
uint64_t heartbeatTimerCounter;
int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
// callback
int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
} SSyncNode;
......@@ -123,23 +153,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode);
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg);
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg);
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg);
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg);
void syncNodePingAll(SSyncNode* pSyncNode);
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
void syncNodePingPeers(SSyncNode* pSyncNode);
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
void syncNodePingSelf(SSyncNode* pSyncNode);
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
......
......@@ -23,13 +23,15 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "sync.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
// encode as uint64
typedef enum ESyncMessageType {
SYNC_PING = 0,
SYNC_PING_REPLY,
SYNC_PING = 101,
SYNC_PING_REPLY = 103,
SYNC_CLIENT_REQUEST,
SYNC_CLIENT_REQUEST_REPLY,
SYNC_REQUEST_VOTE,
......@@ -38,29 +40,86 @@ typedef enum ESyncMessageType {
SYNC_APPEND_ENTRIES_REPLY,
} ESyncMessageType;
/*
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId;
*/
typedef struct SyncPing {
ESyncMessageType msgType;
const SSyncBuffer *pData;
} SyncPing, RaftPing;
uint32_t bytes;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
uint32_t dataLen;
char data[];
} SyncPing;
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing* syncPingBuild(uint32_t dataLen);
void syncPingDestroy(SyncPing* pMsg);
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
cJSON* syncPing2Json(const SyncPing* pMsg);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId);
typedef struct SyncPingReply {
ESyncMessageType msgType;
const SSyncBuffer *pData;
} SyncPingReply, RaftPingReply;
uint32_t bytes;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
uint32_t dataLen;
char data[];
} SyncPingReply;
#define SYNC_PING_REPLY_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPingReply* syncPingReplyBuild(uint32_t dataLen);
void syncPingReplyDestroy(SyncPingReply* pMsg);
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen);
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg);
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg);
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg);
cJSON* syncPingReply2Json(const SyncPingReply* pMsg);
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId);
typedef struct SyncClientRequest {
ESyncMessageType msgType;
const SSyncBuffer *pData;
int64_t seqNum;
bool isWeak;
} SyncClientRequest, RaftClientRequest;
ESyncMessageType msgType;
char* data;
uint32_t dataLen;
int64_t seqNum;
bool isWeak;
} SyncClientRequest;
typedef struct SyncClientRequestReply {
ESyncMessageType msgType;
int32_t errCode;
const SSyncBuffer *pErrMsg;
const SSyncBuffer *pLeaderHint;
} SyncClientRequestReply, RaftClientRequestReply;
ESyncMessageType msgType;
int32_t errCode;
SSyncBuffer* pErrMsg;
SSyncBuffer* pLeaderHint;
} SyncClientRequestReply;
typedef struct SyncRequestVote {
ESyncMessageType msgType;
......@@ -69,7 +128,7 @@ typedef struct SyncRequestVote {
SyncGroupId vgId;
SyncIndex lastLogIndex;
SyncTerm lastLogTerm;
} SyncRequestVote, RaftRequestVote;
} SyncRequestVote;
typedef struct SyncRequestVoteReply {
ESyncMessageType msgType;
......@@ -77,7 +136,7 @@ typedef struct SyncRequestVoteReply {
SyncNodeId nodeId;
SyncGroupId vgId;
bool voteGranted;
} SyncRequestVoteReply, RaftRequestVoteReply;
} SyncRequestVoteReply;
typedef struct SyncAppendEntries {
ESyncMessageType msgType;
......@@ -86,9 +145,9 @@ typedef struct SyncAppendEntries {
SyncIndex prevLogIndex;
SyncTerm prevLogTerm;
int32_t entryCount;
SSyncRaftEntry * logEntries;
SSyncRaftEntry* logEntries;
SyncIndex commitIndex;
} SyncAppendEntries, RaftAppendEntries;
} SyncAppendEntries;
typedef struct SyncAppendEntriesReply {
ESyncMessageType msgType;
......@@ -96,7 +155,7 @@ typedef struct SyncAppendEntriesReply {
SyncNodeId nodeId;
bool success;
SyncIndex matchIndex;
} SyncAppendEntriesReply, RaftAppendEntriesReply;
} SyncAppendEntriesReply;
#ifdef __cplusplus
}
......
......@@ -27,6 +27,8 @@ extern "C" {
#include "syncMessage.h"
#include "taosdef.h"
#if 0
typedef struct SRaftId {
SyncNodeId addr;
SyncGroupId vgId;
......@@ -82,6 +84,8 @@ int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft);
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -34,8 +34,9 @@ extern "C" {
typedef struct SRaftStore {
SyncTerm currentTerm;
SRaftId voteFor;
//FileFd fd;
char path[RAFT_STORE_PATH_LEN];
// FileFd fd;
TdFilePtr pFile;
char path[RAFT_STORE_PATH_LEN];
} SRaftStore;
SRaftStore *raftStoreOpen(const char *path);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_UTIL_H
#define _TD_LIBS_SYNC_UTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
// ---- encode / decode
uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port);
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
// ---- SSyncBuffer ----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len);
void syncUtilbufDestroy(SSyncBuffer* syncBuf);
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest);
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_UTIL_H*/
......@@ -18,6 +18,14 @@
SSyncEnv *gSyncEnv = NULL;
// local function -----------------
static void syncEnvTick(void *param, void *tmrId);
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv);
static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param);
static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer);
// --------------------------------
int32_t syncEnvStart() {
int32_t ret;
gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
......@@ -31,6 +39,40 @@ int32_t syncEnvStop() {
return ret;
}
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; }
tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param) {
return doSyncEnvStartTimer(gSyncEnv, fp, mseconds, param);
}
void syncEnvStopTimer(tmr_h *pTimer) { doSyncEnvStopTimer(gSyncEnv, pTimer); }
// local function -----------------
static void syncEnvTick(void *param, void *tmrId) {
SSyncEnv *pSyncEnv = (SSyncEnv *)param;
sTrace("syncEnvTick ... name:%s ", pSyncEnv->name);
pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager);
}
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) {
snprintf(pSyncEnv->name, sizeof(pSyncEnv->name), "SyncEnv_%p", pSyncEnv);
// start tmr thread
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
// pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager);
sTrace("SyncEnv start ok, name:%s", pSyncEnv->name);
return 0;
}
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
taosTmrCleanUp(pSyncEnv->pTimerManager);
return 0;
}
static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param) {
return taosTmrStart(fp, mseconds, pSyncEnv, pSyncEnv->pTimerManager);
}
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; }
static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {}
......@@ -22,118 +22,72 @@
SSyncIO *gSyncIO = NULL;
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; }
int32_t syncIOStart() { return 0; }
int32_t syncIOStop() { return 0; }
static void syncTick(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sDebug("syncTick ... ");
SRpcMsg rpcMsg;
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "TICK");
rpcMsg.contLen = 10;
rpcMsg.handle = NULL;
rpcMsg.msgType = 2;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager);
// local function ------------
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static SSyncIO *syncIOCreate(char *host, uint16_t port);
static int32_t syncIODestroy(SSyncIO *io);
static void *syncIOConsumerFunc(void *param);
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOTickQInternal(SSyncIO *io);
static void syncIOTickQFunc(void *param, void *tmrId);
static int32_t syncIOTickPingInternal(SSyncIO *io);
static void syncIOTickPingFunc(void *param, void *tmrId);
// ----------------------------
// public function ------------
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
sTrace(
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle,
pMsg->msgType, pMsg->contLen);
pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
}
void *syncConsumer(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL);
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
sDebug("%d sync-io msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont));
}
taosResetQitems(qall);
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) {
int msgSize = 128;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
}
int32_t ret = syncIOStartInternal(gSyncIO);
assert(ret == 0);
taosFreeQitem(pRpcMsg);
}
}
taosFreeQall(qall);
return NULL;
sTrace("syncIOStart ok, gSyncIO:%p gSyncIO->clientRpc:%p", gSyncIO, gSyncIO->clientRpc);
return 0;
}
static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
return ret;
}
int32_t syncIOStop() {
int32_t ret = syncIOStopInternal(gSyncIO);
assert(ret == 0);
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sDebug("processResponse ... ");
rpcFreeCont(pMsg->pCont);
ret = syncIODestroy(gSyncIO);
assert(ret == 0);
return ret;
}
static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SSyncIO *io = pParent;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(io->pMsgQ, pTemp);
int32_t syncIOTickQ() {
int32_t ret = syncIOTickQInternal(gSyncIO);
assert(ret == 0);
return ret;
}
SSyncIO *syncIOCreate() {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
io->start = doSyncIOStart;
io->stop = doSyncIOStop;
io->ping = doSyncIOPing;
io->onMsg = doSyncIOOnMsg;
io->destroy = doSyncIODestroy;
return io;
int32_t syncIOTickPing() {
int32_t ret = syncIOTickPingInternal(gSyncIO);
assert(ret == 0);
return ret;
}
static int32_t doSyncIOStart(SSyncIO *io) {
// local function ------------
static int32_t syncIOStartInternal(SSyncIO *io) {
taosBlockSIGPIPE();
rpcInit();
tsRpcForceTcp = 1;
// cient rpc init
......@@ -143,7 +97,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
rpcInit.localPort = 0;
rpcInit.label = "SYNC-IO-CLIENT";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.cfp = syncIOProcessReply;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "sync-io";
......@@ -163,13 +117,13 @@ static int32_t doSyncIOStart(SSyncIO *io) {
{
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 38000;
rpcInit.localPort = io->myAddr.eps[0].port;
rpcInit.label = "SYNC-IO-SERVER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg;
rpcInit.cfp = syncIOProcessRequest;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo;
rpcInit.afp = syncIOAuth;
rpcInit.parent = io;
rpcInit.connType = TAOS_CONN_SERVER;
......@@ -180,12 +134,9 @@ static int32_t doSyncIOStart(SSyncIO *io) {
}
}
io->epSet.inUse = 0;
addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000);
// start consumer thread
{
if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) {
if (pthread_create(&io->consumerTid, NULL, syncIOConsumerFunc, io) != 0) {
sError("failed to create sync consumer thread since %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -193,35 +144,32 @@ static int32_t doSyncIOStart(SSyncIO *io) {
}
// start tmr thread
io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager);
io->ioTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
return 0;
}
static int32_t doSyncIOStop(SSyncIO *io) {
static int32_t syncIOStopInternal(SSyncIO *io) {
atomic_store_8(&io->isStart, 0);
pthread_join(io->tid, NULL);
pthread_join(io->consumerTid, NULL);
return 0;
}
static int32_t doSyncIOPing(SSyncIO *io) {
SRpcMsg rpcMsg, rspMsg;
static SSyncIO *syncIOCreate(char *host, uint16_t port) {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "ping");
rpcMsg.contLen = 10;
rpcMsg.handle = NULL;
rpcMsg.msgType = 1;
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL);
io->myAddr.inUse = 0;
addEpIntoEpSet(&io->myAddr, host, port);
return 0;
return io;
}
static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; }
static int32_t doSyncIODestroy(SSyncIO *io) {
static int32_t syncIODestroy(SSyncIO *io) {
int8_t start = atomic_load_8(&io->isStart);
assert(start == 0);
......@@ -235,15 +183,149 @@ static int32_t doSyncIODestroy(SSyncIO *io) {
io->clientRpc = NULL;
}
if (io->pMsgQ != NULL) {
free(io->pMsgQ);
io->pMsgQ = NULL;
}
taosCloseQueue(io->pMsgQ);
taosCloseQset(io->pQset);
if (io->pQset != NULL) {
free(io->pQset);
io->pQset = NULL;
return 0;
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen,
(char *)(pRpcMsg->pCont));
if (pRpcMsg->msgType == SYNC_PING) {
if (io->FpOnSyncPing != NULL) {
SyncPing *pSyncMsg;
SRpcMsg tmpRpcMsg;
memcpy(&tmpRpcMsg, pRpcMsg, sizeof(SRpcMsg));
pSyncMsg = syncPingBuild(tmpRpcMsg.contLen);
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen);
syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg);
if (io->FpOnSyncPingReply != NULL) {
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
}
} else {
;
}
}
taosResetQitems(qall);
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) {
int msgSize = 128;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply");
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
sTrace("syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d", pRpcMsg->msgType, rpcMsg.contLen);
rpcSendResponse(&rpcMsg);
}
taosFreeQitem(pRpcMsg);
}
}
taosFreeQall(qall);
return NULL;
}
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
return ret;
}
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sTrace("<-- syncIOProcessRequest --> type:%d, contLen:%d, cont:%s", pMsg->msgType, pMsg->contLen,
(char *)pMsg->pCont);
SSyncIO *io = pParent;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
}
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sTrace("syncIOProcessReply: type:%d, contLen:%d msg:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont);
rpcFreeCont(pMsg->pCont);
}
static int32_t syncIOTickQInternal(SSyncIO *io) {
io->ioTimerTickQ = taosTmrStart(syncIOTickQFunc, 1000, io, io->ioTimerManager);
return 0;
}
static void syncIOTickQFunc(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sTrace("<-- syncIOTickQFunc -->");
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickQ");
rpcMsg.handle = NULL;
rpcMsg.msgType = 55;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
taosTmrReset(syncIOTickQFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickQ);
}
static int32_t syncIOTickPingInternal(SSyncIO *io) {
io->ioTimerTickPing = taosTmrStart(syncIOTickPingFunc, 1000, io, io->ioTimerManager);
return 0;
}
static void syncIOTickPingFunc(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sTrace("<-- syncIOTickPingFunc -->");
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickPing");
rpcMsg.handle = NULL;
rpcMsg.msgType = 77;
rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL);
taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing);
}
\ No newline at end of file
......@@ -15,12 +15,36 @@
#include <stdint.h>
#include "sync.h"
#include "syncEnv.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "syncUtil.h"
int32_t syncInit() { return 0; }
static int32_t tsNodeRefId = -1;
void syncCleanUp() {}
// ------ local funciton ---------
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static void syncNodePingTimerCb(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
// ---------------------------------
int32_t syncInit() {
sTrace("syncInit ok");
return 0;
}
void syncCleanUp() { sTrace("syncCleanUp ok"); }
int64_t syncStart(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
......@@ -32,7 +56,9 @@ void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
......@@ -41,69 +67,238 @@ void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->syncCfg = pSyncInfo->syncCfg;
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
pSyncNode->pFsm = pSyncInfo->pFsm;
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpPing = doSyncNodePing;
pSyncNode->FpOnPing = onSyncNodePing;
pSyncNode->FpOnPingReply = onSyncNodePingReply;
pSyncNode->FpRequestVote = doSyncNodeRequestVote;
pSyncNode->FpOnRequestVote = onSyncNodeRequestVote;
pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply;
pSyncNode->FpAppendEntries = doSyncNodeAppendEntries;
pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries;
pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply;
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
int j = 0;
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
if (i != pSyncInfo->syncCfg.myIndex) {
pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
j++;
}
}
pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000;
atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->FpPingTimer = syncNodePingTimerCb;
pSyncNode->pingTimerCounter = 0;
pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
return pSyncNode;
}
void syncNodeClose(SSyncNode* pSyncNode) {
assert(pSyncNode != NULL);
raftClose(pSyncNode->pRaft);
free(pSyncNode);
}
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) {
int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg);
void syncNodePingAll(SSyncNode* pSyncNode) {
sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
}
void syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
}
void syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t ret;
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
}
atomic_store_8(&pSyncNode->pingTimerStart, 1);
return 0;
}
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->pingTimerCounter = TIMER_MAX_MS;
return 0;
}
// ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
/*
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "xxxxxxxxxxxxxx");
rpcMsg.handle = NULL;
rpcMsg.msgType = 1;
*/
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
{
SyncPing* pMsg2 = rpcMsg.pCont;
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
return ret;
}
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) {
int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg);
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode);
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnPingCb -->");
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
return ret;
}
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnPingReplyCb -->");
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
return ret;
}
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) {
int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
return ret;
}
static void syncNodePingTimerCb(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerStart)) {
++(pSyncNode->pingTimerCounter);
// pSyncNode->pingTimerMS += 100;
sTrace(
"syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
"tmrId:%p ",
pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);
syncNodePingAll(pSyncNode);
taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart);
}
}
\ No newline at end of file
......@@ -15,5 +15,265 @@
#include "syncMessage.h"
#include "syncRaft.h"
#include "syncUtil.h"
#include "tcoding.h"
void onMessage(SRaft *pRaft, void *pMsg) {}
\ No newline at end of file
void onMessage(SRaft* pRaft, void* pMsg) {}
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_FIX_LEN + dataLen;
SyncPing* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_PING;
pMsg->dataLen = dataLen;
}
void syncPingDestroy(SyncPing* pMsg) {
if (pMsg != NULL) {
free(pMsg);
}
}
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen);
}
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
cJSON* syncPing2Json(const SyncPing* pMsg) {
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", pMsg->data);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
}
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) {
uint32_t dataLen = strlen(str) + 1;
SyncPing* pMsg = syncPingBuild(dataLen);
pMsg->srcId = *srcId;
pMsg->destId = *destId;
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
return pMsg;
}
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) {
SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping");
return pMsg;
}
// ---- message process SyncPingReply----
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen;
SyncPingReply* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_PING_REPLY;
pMsg->dataLen = dataLen;
}
void syncPingReplyDestroy(SyncPingReply* pMsg) {
if (pMsg != NULL) {
free(pMsg);
}
}
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen);
}
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", pMsg->data);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot);
return pJson;
}
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) {
uint32_t dataLen = strlen(str) + 1;
SyncPingReply* pMsg = syncPingReplyBuild(dataLen);
pMsg->srcId = *srcId;
pMsg->destId = *destId;
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
return pMsg;
}
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) {
SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, "pang");
return pMsg;
}
#if 0
void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) {
*bufLen = sizeof(SyncPing) + pMsg->dataLen;
*ppBuf = (char*)malloc(*bufLen);
void* pStart = *ppBuf;
uint32_t allBytes = *bufLen;
int len = 0;
len = taosEncodeFixedU32(&pStart, pMsg->msgType);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU64(&pStart, pMsg->destId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU32(&pStart, pMsg->dataLen);
allBytes -= len;
assert(len > 0);
pStart += len;
memcpy(pStart, pMsg->data, pMsg->dataLen);
allBytes -= pMsg->dataLen;
assert(allBytes == 0);
}
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
void* pStart = (void*)buf;
uint64_t u64;
int32_t i32;
uint32_t u32;
pStart = taosDecodeFixedU64(pStart, &u64);
pMsg->msgType = u64;
pStart = taosDecodeFixedU64(pStart, &u64);
pMsg->srcId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pMsg->srcId.vgId = i32;
pStart = taosDecodeFixedU64(pStart, &u64);
pMsg->destId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pMsg->destId.vgId = i32;
pStart = taosDecodeFixedU32(pStart, &u32);
pMsg->dataLen = u32;
}
#endif
\ No newline at end of file
......@@ -16,6 +16,8 @@
#include "syncRaft.h"
#include "sync.h"
#if 0
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
assert(pRaft != NULL);
......@@ -64,3 +66,5 @@ static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesRepl
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; }
#endif
\ No newline at end of file
......@@ -18,23 +18,125 @@
// to complie success: FileIO interface is modified
SRaftStore *raftStoreOpen(const char *path) { return NULL;}
SRaftStore *raftStoreOpen(const char *path) {
int32_t ret;
SRaftStore *pRaftStore = malloc(sizeof(SRaftStore));
if (pRaftStore == NULL) {
sError("raftStoreOpen malloc error");
return NULL;
}
memset(pRaftStore, 0, sizeof(*pRaftStore));
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
char storeBuf[RAFT_STORE_BLOCK_SIZE];
memset(storeBuf, 0, sizeof(storeBuf));
if (!raftStoreFileExist(pRaftStore->path)) {
ret = raftStoreInit(pRaftStore);
assert(ret == 0);
}
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
assert(pRaftStore->pFile != NULL);
int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
assert(len == RAFT_STORE_BLOCK_SIZE);
ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
assert(ret == 0);
return pRaftStore;
}
static int32_t raftStoreInit(SRaftStore *pRaftStore) {
assert(pRaftStore != NULL);
pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CTEATE | TD_FILE_WRITE);
assert(pRaftStore->pFile != NULL);
pRaftStore->currentTerm = 0;
pRaftStore->voteFor.addr = 0;
pRaftStore->voteFor.vgId = 0;
int32_t ret = raftStorePersist(pRaftStore);
assert(ret == 0);
taosCloseFile(&pRaftStore->pFile);
return 0;
}
int32_t raftStoreClose(SRaftStore *pRaftStore) {
assert(pRaftStore != NULL);
taosCloseFile(&pRaftStore->pFile);
free(pRaftStore);
pRaftStore = NULL;
return 0;
}
int32_t raftStorePersist(SRaftStore *pRaftStore) {
assert(pRaftStore != NULL);
int32_t ret;
char storeBuf[RAFT_STORE_BLOCK_SIZE];
ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
assert(ret == 0);
static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0;}
taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET);
int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0;}
ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf));
assert(ret == RAFT_STORE_BLOCK_SIZE);
int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0;}
taosFsyncFile(pRaftStore->pFile);
return 0;
}
static bool raftStoreFileExist(char *path) { return 0;}
static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; }
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;}
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(pRaftStore != NULL);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;}
cJSON *pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm);
cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
void raftStorePrint(SRaftStore *pRaftStore) {}
char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized);
assert(len2 < len);
memset(buf, 0, len);
snprintf(buf, len, "%s", serialized);
free(serialized);
cJSON_Delete(pRoot);
return 0;
}
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(pRaftStore != NULL);
assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
cJSON *pRoot = cJSON_Parse(buf);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
pRaftStore->currentTerm = pCurrentTerm->valueint;
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
pRaftStore->voteFor.addr = pVoteForAddr->valueint;
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
cJSON_Delete(pRoot);
return 0;
}
void raftStorePrint(SRaftStore *pRaftStore) {
char storeBuf[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
printf("%s\n", storeBuf);
}
#if 0
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncUtil.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
// ---- encode / decode
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint64_t u64;
uint32_t hostU32 = (uint32_t)inet_addr(host);
// assert(hostU32 != (uint32_t)-1);
u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16);
return u64;
}
void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) {
uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF);
struct in_addr addr;
addr.s_addr = hostU32;
snprintf(host, len, "%s", inet_ntoa(addr));
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
}
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) {
pEpSet->inUse = 0;
pEpSet->numOfEps = 0;
addEpIntoEpSet(pEpSet, pNodeInfo->nodeFqdn, pNodeInfo->nodePort);
}
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
char host[TSDB_FQDN_LEN];
uint16_t port;
syncUtilU642Addr(raftId->addr, host, sizeof(host), &port);
/*
pEpSet->numOfEps = 1;
pEpSet->inUse = 0;
pEpSet->eps[0].port = port;
snprintf(pEpSet->eps[0].fqdn, sizeof(pEpSet->eps[0].fqdn), "%s", host);
*/
pEpSet->inUse = 0;
pEpSet->numOfEps = 0;
addEpIntoEpSet(pEpSet, host, port);
}
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn);
assert(ipv4 != 0xFFFFFFFF);
char ipbuf[128];
tinet_ntoa(ipbuf, ipv4);
raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort);
raftId->vgId = vgId;
}
// ---- SSyncBuffer -----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) {
syncBuf->len = len;
syncBuf->data = malloc(syncBuf->len);
}
void syncUtilbufDestroy(SSyncBuffer* syncBuf) { free(syncBuf->data); }
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->len = src->len;
dest->data = src->data;
}
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->len = src->len;
dest->data = malloc(dest->len);
memcpy(dest->data, src->data, dest->len);
}
#endif
\ No newline at end of file
add_executable(syncTest "")
add_executable(syncEnvTest "")
add_executable(syncPingTest "")
add_executable(syncEncodeTest "")
add_executable(syncIOTickQTest "")
add_executable(syncIOTickPingTest "")
add_executable(syncIOSendMsgTest "")
add_executable(syncIOSendMsgClientTest "")
add_executable(syncIOSendMsgServerTest "")
add_executable(syncRaftStoreTest "")
target_sources(syncTest
......@@ -15,6 +22,34 @@ target_sources(syncPingTest
PRIVATE
"syncPingTest.cpp"
)
target_sources(syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
)
target_sources(syncIOTickQTest
PRIVATE
"syncIOTickQTest.cpp"
)
target_sources(syncIOTickPingTest
PRIVATE
"syncIOTickPingTest.cpp"
)
target_sources(syncIOSendMsgTest
PRIVATE
"syncIOSendMsgTest.cpp"
)
target_sources(syncIOSendMsgClientTest
PRIVATE
"syncIOSendMsgClientTest.cpp"
)
target_sources(syncIOSendMsgServerTest
PRIVATE
"syncIOSendMsgServerTest.cpp"
)
target_sources(syncRaftStoreTest
PRIVATE
"syncRaftStoreTest.cpp"
)
target_include_directories(syncTest
......@@ -32,6 +67,41 @@ target_include_directories(syncPingTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEncodeTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncIOTickQTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncIOTickPingTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncIOSendMsgTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncIOSendMsgClientTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncIOSendMsgServerTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRaftStoreTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -46,6 +116,34 @@ target_link_libraries(syncPingTest
sync
gtest_main
)
target_link_libraries(syncEncodeTest
sync
gtest_main
)
target_link_libraries(syncIOTickQTest
sync
gtest_main
)
target_link_libraries(syncIOTickPingTest
sync
gtest_main
)
target_link_libraries(syncIOSendMsgTest
sync
gtest_main
)
target_link_libraries(syncIOSendMsgClientTest
sync
gtest_main
)
target_link_libraries(syncIOSendMsgServerTest
sync
gtest_main
)
target_link_libraries(syncRaftStoreTest
sync
gtest_main
)
enable_testing()
......
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
#define PING_MSG_LEN 20
void test1() {
sTrace("test1: ---- syncPingSerialize, syncPingDeserialize");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "test ping");
SyncPing* pMsg = syncPingBuild(PING_MSG_LEN);
pMsg->srcId.addr = 1;
pMsg->srcId.vgId = 2;
pMsg->destId.addr = 3;
pMsg->destId.vgId = 4;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("SyncPing: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncPingSerialize(pMsg, buf, bufLen);
SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes);
syncPingDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("SyncPing2: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
free(buf);
}
void test2() {
sTrace("test2: ---- syncPing2RpcMsg, syncPingFromRpcMsg");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "hello raft");
SyncPing* pMsg = syncPingBuild(PING_MSG_LEN);
pMsg->srcId.addr = 100;
pMsg->srcId.vgId = 200;
pMsg->destId.addr = 300;
pMsg->destId.vgId = 400;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("SyncPing: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes);
syncPingFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("SyncPing2: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
}
void test3() {
sTrace("test3: ---- syncPingReplySerialize, syncPingReplyDeserialize");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "test ping");
SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN);
pMsg->srcId.addr = 19;
pMsg->srcId.vgId = 29;
pMsg->destId.addr = 39;
pMsg->destId.vgId = 49;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("SyncPingReply: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncPingReplySerialize(pMsg, buf, bufLen);
SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes);
syncPingReplyDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncPingReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("SyncPingReply2: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
free(buf);
}
void test4() {
sTrace("test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "hello raft");
SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN);
pMsg->srcId.addr = 66;
pMsg->srcId.vgId = 77;
pMsg->destId.addr = 88;
pMsg->destId.vgId = 99;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("SyncPingReply: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes);
syncPingReplyFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncPingReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("SyncPingReply2: \n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
}
int main() {
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
test1();
test2();
test3();
test4();
return 0;
}
......@@ -34,19 +34,20 @@ void doSync() {
}
int main() {
//taosInitLog((char*)"syncEnvTest.log", 100000, 10);
// taosInitLog((char*)"syncEnvTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
int32_t ret;
logTest();
int32_t ret = syncIOStart();
assert(ret == 0);
// ret = syncIOStart();
// assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
doSync();
// doSync();
while (1) {
taosMsleep(1000);
......
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret;
ret = syncIOStart((char *)"127.0.0.1", 7010);
assert(ret == 0);
for (int i = 0; i < 10; ++i) {
SEpSet epSet;
epSet.inUse = 0;
epSet.numOfEps = 0;
addEpIntoEpSet(&epSet, "127.0.0.1", 7030);
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest");
rpcMsg.handle = NULL;
rpcMsg.msgType = 77;
syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg);
sleep(1);
}
while (1) {
sleep(1);
}
return 0;
}
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret;
ret = syncIOStart((char *)"127.0.0.1", 7030);
assert(ret == 0);
while (1) {
sleep(1);
}
return 0;
}
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret;
ret = syncIOStart((char *)"127.0.0.1", 7010);
assert(ret == 0);
for (int i = 0; i < 10; ++i) {
SEpSet epSet;
epSet.inUse = 0;
epSet.numOfEps = 0;
addEpIntoEpSet(&epSet, "127.0.0.1", 7010);
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest");
rpcMsg.handle = NULL;
rpcMsg.msgType = 77;
syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg);
sleep(1);
}
while (1) {
sleep(1);
}
return 0;
}
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret;
ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0);
ret = syncIOTickPing();
assert(ret == 0);
while (1) {
sleep(1);
}
return 0;
}
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret;
ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0);
ret = syncIOTickQ();
assert(ret == 0);
while (1) {
sleep(1);
}
return 0;
}
......@@ -13,49 +13,71 @@ void logTest() {
sFatal("--- sync log test: fatal");
}
void doSync() {
SSyncNode* doSync() {
SSyncFSM* pFsm;
SSyncInfo syncInfo;
syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = 0;
pCfg->replicaNum = 3;
pCfg->replicaNum = 2;
pCfg->nodeInfo[0].nodePort = 7010;
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = 7110;
taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = 7210;
taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
void timerPingAll(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
syncNodePingAll(pSyncNode);
}
int main() {
//taosInitLog((char*)"syncPingTest.log", 100000, 10);
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret = syncIOStart();
int32_t ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
doSync();
SSyncNode* pSyncNode = doSync();
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
/*
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
*/
while (1) {
taosMsleep(1000);
......
#include "syncRaftStore.h"
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
void *pingFunc(void *param) {
SSyncIO *io = (SSyncIO *)param;
while (1) {
sDebug("io->ping");
// io->ping(io);
sleep(1);
}
return NULL;
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
sTrace("sync log test: trace");
sDebug("sync log test: debug");
sInfo("sync log test: info");
sWarn("sync log test: warn");
sError("sync log test: error");
sFatal("sync log test: fatal");
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL);
raftStorePrint(pRaftStore);
pRaftStore->currentTerm = 100;
pRaftStore->voteFor.addr = 200;
pRaftStore->voteFor.vgId = 300;
raftStorePrint(pRaftStore);
raftStorePersist(pRaftStore);
return 0;
}
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "gtest/gtest.h"
void *pingFunc(void *param) {
SSyncIO *io = (SSyncIO *)param;
while (1) {
sDebug("io->ping");
io->ping(io);
// io->ping(io);
sleep(1);
}
return NULL;
}
int main() {
//taosInitLog((char *)"syncTest.log", 100000, 10);
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
......
......@@ -15,8 +15,6 @@
#define ALLOW_FORBID_FUNC
#include "os.h"
#define MAX_FPRINTFLINE_BUFFER_SIZE (1000)
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include <io.h>
......@@ -46,10 +44,15 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */
typedef int32_t FileFd;
#define FILE_WITH_LOCK 1
typedef struct TdFile {
int refId;
FileFd fd;
FILE *fp;
#if FILE_WITH_LOCK
pthread_rwlock_t rwlock;
#endif
int refId;
FileFd fd;
FILE *fp;
} * TdFilePtr, TdFile;
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) {
......@@ -238,6 +241,9 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
if (fp != NULL) fclose(fp);
return NULL;
}
#if FILE_WITH_LOCK
pthread_rwlock_init(&(pFile->rwlock),NULL);
#endif
pFile->fd = fd;
pFile->fp = fp;
pFile->refId = 0;
......@@ -249,6 +255,12 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
if (ppFile == NULL || *ppFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
pthread_rwlock_wrlock(&((*ppFile)->rwlock));
#endif
if (ppFile == NULL || *ppFile == NULL || (*ppFile)->fd == -1) {
return 0;
}
......@@ -263,6 +275,10 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
(*ppFile)->fd = -1;
}
(*ppFile)->refId = 0;
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&((*ppFile)->rwlock));
pthread_rwlock_destroy(&((*ppFile)->rwlock));
#endif
free(*ppFile);
*ppFile = NULL;
return 0;
......@@ -273,6 +289,9 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
pthread_rwlock_rdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0);
int64_t leftbytes = count;
int64_t readbytes;
......@@ -284,9 +303,15 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
if (errno == EINTR) {
continue;
} else {
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&(pFile->rwlock));
#endif
return -1;
}
} else if (readbytes == 0) {
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&(pFile->rwlock));
#endif
return (int64_t)(count - leftbytes);
}
......@@ -294,6 +319,9 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
tbuf += readbytes;
}
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&(pFile->rwlock));
#endif
return count;
}
......@@ -301,14 +329,24 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
pthread_rwlock_rdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0);
return pread(pFile->fd, buf, count, offset);
int64_t ret = pread(pFile->fd, buf, count, offset);
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&(pFile->rwlock));
#endif
return ret;
}
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
pthread_rwlock_wrlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0);
int64_t nleft = count;
......@@ -321,12 +359,18 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
if (errno == EINTR) {
continue;
}
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&(pFile->rwlock));
#endif
return -1;
}
nleft -= nwritten;
tbuf += nwritten;
}
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&(pFile->rwlock));
#endif
return count;
}
......@@ -334,8 +378,15 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
pthread_rwlock_rdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0);
return (int64_t)lseek(pFile->fd, (long)offset, whence);
int64_t ret = lseek(pFile->fd, (long)offset, whence);
#if FILE_WITH_LOCK
pthread_rwlock_unlock(&(pFile->rwlock));
#endif
return ret;
}
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
......@@ -637,7 +688,6 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
}
assert(pFile->fp != NULL);
char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0};
va_list ap;
va_start(ap, format);
vfprintf(pFile->fp, format, ap);
......
......@@ -345,6 +345,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information t
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册