diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h
index 747d97b7a08992389092f2b9a7b6d69a4e00a208..d6f7b46870e60565565e3e32697962a396c1f4af 100644
--- a/include/dnode/vnode/tq/tq.h
+++ b/include/dnode/vnode/tq/tq.h
@@ -16,9 +16,9 @@
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
+#include "mallocator.h"
#include "os.h"
#include "tutil.h"
-#include "mallocator.h"
#ifdef __cplusplus
extern "C" {
@@ -97,128 +97,125 @@ typedef struct TmqHeartbeatReq {
typedef struct TmqHeartbeatRsp {
} TmqHeartbeatRsp;
-typedef struct TqTopicVhandle {
+typedef struct STqTopicVhandle {
int64_t topicId;
// executor for filter
void* filterExec;
// callback for mnode
// trigger when vnode list associated topic change
void* (*mCallback)(void*, void*);
-} TqTopicVhandle;
-
+} STqTopicVhandle;
#define TQ_BUFFER_SIZE 8
-typedef struct TqBufferItem {
+typedef struct STqBufferItem {
int64_t offset;
// executors are identical but not concurrent
// so there must be a copy in each item
void* executor;
int64_t size;
void* content;
-} TqBufferItem;
+} STqBufferItem;
-typedef struct TqBufferHandle {
+typedef struct STqBufferHandle {
// char* topic; //c style, end with '\0'
// int64_t cgId;
// void* ahandle;
- int64_t nextConsumeOffset;
- int64_t topicId;
- int32_t head;
- int32_t tail;
- TqBufferItem buffer[TQ_BUFFER_SIZE];
-} TqBufferHandle;
-
-typedef struct TqListHandle {
- TqBufferHandle bufHandle;
- struct TqListHandle* next;
-} TqListHandle;
-
-typedef struct TqGroupHandle {
- int64_t cId;
- int64_t cgId;
- void* ahandle;
- int32_t topicNum;
- TqListHandle* head;
-} TqGroupHandle;
-
-typedef struct TqQueryExec {
- void* src;
- TqBufferItem* dest;
- void* executor;
-} TqQueryExec;
-
-typedef struct TqQueryMsg {
- TqQueryExec* exec;
- struct TqQueryMsg* next;
-} TqQueryMsg;
-
-typedef struct TqLogReader {
+ int64_t nextConsumeOffset;
+ int64_t floatingCursor;
+ int64_t topicId;
+ int32_t head;
+ int32_t tail;
+ STqBufferItem buffer[TQ_BUFFER_SIZE];
+} STqBufferHandle;
+
+typedef struct STqListHandle {
+ STqBufferHandle bufHandle;
+ struct STqListHandle* next;
+} STqListHandle;
+
+typedef struct STqGroupHandle {
+ int64_t cId;
+ int64_t cgId;
+ void* ahandle;
+ int32_t topicNum;
+ STqListHandle* head;
+} STqGroupHandle;
+
+typedef struct STqQueryExec {
+ void* src;
+ STqBufferItem* dest;
+ void* executor;
+} STqQueryExec;
+
+typedef struct STqQueryMsg {
+ STqQueryExec* exec;
+ struct STqQueryMsg* next;
+} STqQueryMsg;
+
+typedef struct STqLogReader {
void* logHandle;
int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
int64_t (*logGetFirstVer)(void* logHandle);
int64_t (*logGetSnapshotVer)(void* logHandle);
int64_t (*logGetLastVer)(void* logHandle);
-} TqLogReader;
+} STqLogReader;
typedef struct STqCfg {
// TODO
} STqCfg;
-typedef struct TqMemRef {
- SMemAllocatorFactory *pAlloctorFactory;
- SMemAllocator *pAllocator;
-} TqMemRef;
+typedef struct STqMemRef {
+ SMemAllocatorFactory* pAlloctorFactory;
+ SMemAllocator* pAllocator;
+} STqMemRef;
-typedef struct TqSerializedHead {
+typedef struct STqSerializedHead {
int16_t ver;
int16_t action;
int32_t checksum;
int64_t ssize;
char content[];
-} TqSerializedHead;
+} STqSerializedHead;
-typedef int (*TqSerializeFun)(const void* pObj, TqSerializedHead** ppHead);
-typedef const void* (*TqDeserializeFun)(const TqSerializedHead* pHead, void** ppObj);
-typedef void (*TqDeleteFun)(void*);
+typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
+typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
+typedef void (*FTqDelete)(void*);
#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256
#define TQ_PAGE_SIZE 4096
-//key + offset + size
+// key + offset + size
#define TQ_IDX_SIZE 24
-//4096 / 24
+// 4096 / 24
#define TQ_MAX_IDX_ONE_PAGE 170
-//24 * 170
+// 24 * 170
#define TQ_IDX_PAGE_BODY_SIZE 4080
-//4096 - 4080
+// 4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16
-#define TQ_ACTION_CONST 0
-#define TQ_ACTION_INUSE 1
+#define TQ_ACTION_CONST 0
+#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
-#define TQ_ACTION_INTXN 3
+#define TQ_ACTION_INTXN 3
-#define TQ_SVER 0
+#define TQ_SVER 0
-//TODO: inplace mode is not implemented
-#define TQ_UPDATE_INPLACE 0
-#define TQ_UPDATE_APPEND 1
+// TODO: inplace mode is not implemented
+#define TQ_UPDATE_INPLACE 0
+#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
-#define TQ_DUP_INTXN_REJECT 2
+#define TQ_DUP_INTXN_REJECT 2
-static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
- return tqConfigFlag & TQ_UPDATE_APPEND;
-}
+static inline bool TqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
-static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
- return tqConfigFlag & TQ_DUP_INTXN_REJECT;
-}
+static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
-#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
+
+#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef struct TqMetaHandle {
int64_t key;
@@ -229,41 +226,41 @@ typedef struct TqMetaHandle {
} STqMetaHandle;
typedef struct TqMetaList {
- STqMetaHandle handle;
+ STqMetaHandle handle;
struct TqMetaList* next;
- //struct TqMetaList* inTxnPrev;
- //struct TqMetaList* inTxnNext;
+ // struct TqMetaList* inTxnPrev;
+ // struct TqMetaList* inTxnNext;
struct TqMetaList* unpersistPrev;
struct TqMetaList* unpersistNext;
} STqMetaList;
typedef struct TqMetaStore {
- STqMetaList* bucket[TQ_BUCKET_SIZE];
- //a table head
- STqMetaList* unpersistHead;
- //TODO:temporaral use, to be replaced by unified tfile
- int fileFd;
- //TODO:temporaral use, to be replaced by unified tfile
- int idxFd;
- char* dirPath;
- int32_t tqConfigFlag;
- TqSerializeFun pSerializer;
- TqDeserializeFun pDeserializer;
- TqDeleteFun pDeleter;
+ STqMetaList* bucket[TQ_BUCKET_SIZE];
+ // a table head
+ STqMetaList* unpersistHead;
+ // TODO:temporaral use, to be replaced by unified tfile
+ int fileFd;
+ // TODO:temporaral use, to be replaced by unified tfile
+ int idxFd;
+ char* dirPath;
+ int32_t tqConfigFlag;
+ FTqSerialize pSerializer;
+ FTqDeserialize pDeserializer;
+ FTqDelete pDeleter;
} STqMetaStore;
typedef struct STQ {
// the collection of group handle
// the handle of kvstore
- char* path;
- STqCfg* tqConfig;
- TqLogReader* tqLogReader;
- TqMemRef tqMemRef;
+ char* path;
+ STqCfg* tqConfig;
+ STqLogReader* tqLogReader;
+ STqMemRef tqMemRef;
STqMetaStore* tqMeta;
} STQ;
// open in each vnode
-STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac);
+STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
void tqDestroy(STQ*);
// void* will be replace by a msg type
@@ -272,19 +269,19 @@ int tqCommit(STQ*);
int tqConsume(STQ*, TmqConsumeReq*);
-TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
+STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
-TqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
-int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
-int tqMoveOffsetToNext(TqGroupHandle*);
-int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
-int tqRegisterContext(TqGroupHandle*, void* ahandle);
-int tqLaunchQuery(TqGroupHandle*);
-int tqSendLaunchQuery(TqGroupHandle*);
+STqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
+int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
+int tqMoveOffsetToNext(STqGroupHandle*);
+int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
+int tqRegisterContext(STqGroupHandle*, void* ahandle);
+int tqLaunchQuery(STqGroupHandle*);
+int tqSendLaunchQuery(STqGroupHandle*);
-int tqSerializeGroupHandle(const TqGroupHandle* gHandle, TqSerializedHead** ppHead);
+int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead);
-const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle** gHandle);
+const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** gHandle);
#ifdef __cplusplus
}
diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index bafdc6c082d1ad389037e102895e99918f06d743..89f24cf3a41eeb2b3bb7005bc1094e4068e4035d 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -16,49 +16,75 @@
#define _TD_WAL_H_
#include "os.h"
+#include "tarray.h"
#include "tdef.h"
#include "tlog.h"
-#include "tarray.h"
#ifdef __cplusplus
extern "C" {
#endif
extern int32_t wDebugFlag;
-#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
-#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
-#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }}
-#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }}
-#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
-#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
-
-#define WAL_HEAD_VER 0
+#define wFatal(...) \
+ { \
+ if (wDebugFlag & DEBUG_FATAL) { \
+ taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); \
+ } \
+ }
+#define wError(...) \
+ { \
+ if (wDebugFlag & DEBUG_ERROR) { \
+ taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); \
+ } \
+ }
+#define wWarn(...) \
+ { \
+ if (wDebugFlag & DEBUG_WARN) { \
+ taosPrintLog("WAL WARN ", 255, __VA_ARGS__); \
+ } \
+ }
+#define wInfo(...) \
+ { \
+ if (wDebugFlag & DEBUG_INFO) { \
+ taosPrintLog("WAL ", 255, __VA_ARGS__); \
+ } \
+ }
+#define wDebug(...) \
+ { \
+ if (wDebugFlag & DEBUG_DEBUG) { \
+ taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); \
+ } \
+ }
+#define wTrace(...) \
+ { \
+ if (wDebugFlag & DEBUG_TRACE) { \
+ taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); \
+ } \
+ }
+
+#define WAL_HEAD_VER 0
#define WAL_NOSUFFIX_LEN 20
-#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
-#define WAL_LOG_SUFFIX "log"
+#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
+#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
-#define WAL_REFRESH_MS 1000
-#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
-#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
-#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
+#define WAL_REFRESH_MS 1000
+#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
+#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
+#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
-#define WAL_CUR_FAILED 1
+#define WAL_CUR_FAILED 1
#pragma pack(push, 1)
-typedef enum {
- TAOS_WAL_NOLOG = 0,
- TAOS_WAL_WRITE = 1,
- TAOS_WAL_FSYNC = 2
-} EWalType;
+typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWalType;
typedef struct SWalReadHead {
- int8_t headVer;
- uint8_t msgType;
- int8_t reserved[2];
- int32_t len;
- int64_t ingestTs; //not implemented
- int64_t version;
- char body[];
+ int8_t headVer;
+ uint8_t msgType;
+ int8_t reserved[2];
+ int32_t len;
+ int64_t ingestTs; // not implemented
+ int64_t version;
+ char body[];
} SWalReadHead;
typedef struct {
@@ -68,12 +94,12 @@ typedef struct {
int32_t rollPeriod; // secs
int64_t retentionSize;
int64_t segSize;
- EWalType level; // wal level
+ EWalType level; // wal level
} SWalCfg;
typedef struct {
- uint32_t cksumHead;
- uint32_t cksumBody;
+ uint32_t cksumHead;
+ uint32_t cksumBody;
SWalReadHead head;
} SWalHead;
@@ -89,37 +115,37 @@ typedef struct SWal {
// cfg
SWalCfg cfg;
int32_t fsyncSeq;
- //meta
+ // meta
SWalVer vers;
int64_t writeLogTfd;
int64_t writeIdxTfd;
int32_t writeCur;
- SArray* fileInfoSet;
- //status
+ SArray *fileInfoSet;
+ // status
int64_t totSize;
int64_t lastRollSeq;
- //ctl
- int64_t refId;
+ // ctl
+ int64_t refId;
pthread_mutex_t mutex;
- //path
+ // path
char path[WAL_PATH_LEN];
- //reusable write head
+ // reusable write head
SWalHead writeHead;
} SWal; // WAL HANDLE
typedef struct SWalReadHandle {
- SWal* pWal;
- int64_t readLogTfd;
- int64_t readIdxTfd;
- int64_t curFileFirstVer;
- int64_t curVersion;
- int64_t capacity;
- int64_t status; //if cursor valid
- SWalHead* pHead;
+ SWal *pWal;
+ int64_t readLogTfd;
+ int64_t readIdxTfd;
+ int64_t curFileFirstVer;
+ int64_t curVersion;
+ int64_t capacity;
+ int64_t status; // if cursor valid
+ SWalHead *pHead;
} SWalReadHandle;
#pragma pack(pop)
-//typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
+// typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization
int32_t walInit();
@@ -141,15 +167,15 @@ int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
int32_t walBeginSnapshot(SWal *, int64_t ver);
int32_t walEndSnapshot(SWal *);
-//int32_t walDataCorrupted(SWal*);
+// int32_t walDataCorrupted(SWal*);
// read
-SWalReadHandle* walOpenReadHandle(SWal *);
-void walCloseReadHandle(SWalReadHandle *);
-int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
+SWalReadHandle *walOpenReadHandle(SWal *);
+void walCloseReadHandle(SWalReadHandle *);
+int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walRead(SWal *, SWalHead **, int64_t ver);
-//int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
+// int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
// lifecycle check
int64_t walGetFirstVer(SWal *);
diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h
index 4190125e44d1e7ee4c014b6e531b027afd40a5cc..253852b00fada07060cfd97c8d5c36e7456367bf 100644
--- a/source/dnode/vnode/tq/inc/tqMetaStore.h
+++ b/source/dnode/vnode/tq/inc/tqMetaStore.h
@@ -25,9 +25,9 @@ extern "C" {
STqMetaStore* tqStoreOpen(const char* path,
- TqSerializeFun pSerializer,
- TqDeserializeFun pDeserializer,
- TqDeleteFun pDeleter,
+ FTqSerialize pSerializer,
+ FTqDeserialize pDeserializer,
+ FTqDelete pDeleter,
int32_t tqConfigFlag
);
int32_t tqStoreClose(STqMetaStore*);
diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c
index 5888141c58a1c84c6d98ddcfd04a49a14b0aea8a..1326666857758618baca7984a4d8934bd3e738cb 100644
--- a/source/dnode/vnode/tq/src/tq.c
+++ b/source/dnode/vnode/tq/src/tq.c
@@ -16,75 +16,70 @@
#include "tqInt.h"
#include "tqMetaStore.h"
-//static
-//read next version data
+// static
+// read next version data
//
-//send to fetch queue
+// send to fetch queue
//
-//handle management message
+// handle management message
//
-int tqGetgHandleSSize(const TqGroupHandle *gHandle);
+int tqGetgHandleSSize(const STqGroupHandle* gHandle);
int tqBufHandleSSize();
int tqBufItemSSize();
-TqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
- TqGroupHandle* gHandle;
+STqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
+ STqGroupHandle* gHandle;
return NULL;
}
-void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
-void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
-void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
+void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr);
+void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr);
+void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr);
-const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
-const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
+const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle);
+const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem);
-STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) {
+STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ));
- if(pTq == NULL) {
- //TODO: memory error
+ if (pTq == NULL) {
+ // TODO: memory error
return NULL;
}
pTq->path = strdup(path);
pTq->tqConfig = tqConfig;
pTq->tqLogReader = tqLogReader;
- pTq->tqMemRef.pAlloctorFactory = allocFac;
- pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
- if(pTq->tqMemRef.pAllocator == NULL) {
- //TODO
+ pTq->tqMemRef.pAlloctorFactory = allocFac;
+ pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
+ if (pTq->tqMemRef.pAllocator == NULL) {
+ // TODO
}
- pTq->tqMeta = tqStoreOpen(path,
- (TqSerializeFun)tqSerializeGroupHandle,
- (TqDeserializeFun)tqDeserializeGroupHandle,
- free,
- 0);
- if(pTq->tqMeta == NULL) {
- //TODO: free STQ
+ pTq->tqMeta =
+ tqStoreOpen(path, (FTqSerialize)tqSerializeGroupHandle, (FTqDeserialize)tqDeserializeGroupHandle, free, 0);
+ if (pTq->tqMeta == NULL) {
+ // TODO: free STQ
return NULL;
}
return pTq;
}
-static int tqProtoCheck(TmqMsgHead *pMsg) {
- return pMsg->protoVer == 0;
-}
+static int tqProtoCheck(TmqMsgHead* pMsg) { return pMsg->protoVer == 0; }
-static int tqAckOneTopic(TqBufferHandle *bHandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) {
- //clean old item and move forward
+static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) {
+ // clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset;
- int idx = consumeOffset % TQ_BUFFER_SIZE;
+ int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor);
tfree(bHandle->buffer[idx].content);
- if( 1 /* TODO: need to launch new query */) {
- TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg));
- if(pNewQuery == NULL) {
- //TODO: memory insufficient
+ if (1 /* TODO: need to launch new query */) {
+ STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
+ if (pNewQuery == NULL) {
+ // TODO: memory insufficient
return -1;
}
- //TODO: lock executor
+ // TODO: lock executor
pNewQuery->exec->executor = bHandle->buffer[idx].executor;
- //TODO: read from wal and assign to src
+ // TODO: read from wal and assign to src
pNewQuery->exec->src = 0;
pNewQuery->exec->dest = &bHandle->buffer[idx];
pNewQuery->next = *ppQuery;
@@ -93,98 +88,94 @@ static int tqAckOneTopic(TqBufferHandle *bHandle, TmqOneAck *pAck, TqQueryMsg**
return 0;
}
-static int tqAck(TqGroupHandle* gHandle, TmqAcks* pAcks) {
- int32_t ackNum = pAcks->ackNum;
- TmqOneAck *acks = pAcks->acks;
- //double ptr for acks and list
- int i = 0;
- TqListHandle* node = gHandle->head;
- int ackCnt = 0;
- TqQueryMsg *pQuery = NULL;
- while(i < ackNum && node->next) {
- if(acks[i].topicId == node->next->bufHandle.topicId) {
+static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) {
+ int32_t ackNum = pAcks->ackNum;
+ TmqOneAck* acks = pAcks->acks;
+ // double ptr for acks and list
+ int i = 0;
+ STqListHandle* node = gHandle->head;
+ int ackCnt = 0;
+ STqQueryMsg* pQuery = NULL;
+ while (i < ackNum && node->next) {
+ if (acks[i].topicId == node->next->bufHandle.topicId) {
ackCnt++;
tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery);
- } else if(acks[i].topicId < node->next->bufHandle.topicId) {
+ } else if (acks[i].topicId < node->next->bufHandle.topicId) {
i++;
} else {
node = node->next;
}
}
- if(pQuery) {
- //post message
+ if (pQuery) {
+ // post message
}
return ackCnt;
}
-static int tqCommitTCGroup(TqGroupHandle* handle) {
- //persist modification into disk
+static int tqCommitTCGroup(STqGroupHandle* handle) {
+ // persist modification into disk
return 0;
}
-int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
- //create in disk
- TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
- if(gHandle == NULL) {
- //TODO
+int tqCreateTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroupHandle** handle) {
+ // create in disk
+ STqGroupHandle* gHandle = (STqGroupHandle*)malloc(sizeof(STqGroupHandle));
+ if (gHandle == NULL) {
+ // TODO
return -1;
}
- memset(gHandle, 0, sizeof(TqGroupHandle));
+ memset(gHandle, 0, sizeof(STqGroupHandle));
return 0;
}
-TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
- TqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
- if(gHandle == NULL) {
+STqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
+ STqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
+ if (gHandle == NULL) {
int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle);
- if(code != 0) {
- //TODO
+ if (code != 0) {
+ // TODO
return NULL;
}
}
- //create
- //open
+ // create
+ // open
return gHandle;
}
-int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
- return 0;
-}
+int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; }
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
- //delete from disk
+ // delete from disk
return 0;
}
-static int tqFetch(TqGroupHandle* gHandle, void** msg) {
- TqListHandle* head = gHandle->head;
- TqListHandle* node = head;
- int totSize = 0;
- //TODO: make it a macro
- int sizeLimit = 4 * 1024;
+static int tqFetch(STqGroupHandle* gHandle, void** msg) {
+ STqListHandle* head = gHandle->head;
+ STqListHandle* node = head;
+ int totSize = 0;
+ // TODO: make it a macro
+ int sizeLimit = 4 * 1024;
TmqMsgContent* buffer = malloc(sizeLimit);
- if(buffer == NULL) {
- //TODO:memory insufficient
+ if (buffer == NULL) {
+ // TODO:memory insufficient
return -1;
}
- //iterate the list to get msgs of all topics
- //until all topic iterated or msgs over sizeLimit
- while(node->next) {
+ // iterate the list to get msgs of all topics
+ // until all topic iterated or msgs over sizeLimit
+ while (node->next) {
node = node->next;
- TqBufferHandle* bufHandle = &node->bufHandle;
- int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
- if(bufHandle->buffer[idx].content != NULL &&
- bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
- ) {
+ STqBufferHandle* bufHandle = &node->bufHandle;
+ int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
+ if (bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset) {
totSize += bufHandle->buffer[idx].size;
- if(totSize > sizeLimit) {
- void *ptr = realloc(buffer, totSize);
- if(ptr == NULL) {
+ if (totSize > sizeLimit) {
+ void* ptr = realloc(buffer, totSize);
+ if (ptr == NULL) {
totSize -= bufHandle->buffer[idx].size;
- //TODO:memory insufficient
- //return msgs already copied
+ // TODO:memory insufficient
+ // return msgs already copied
break;
}
}
@@ -194,7 +185,7 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) {
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size);
buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size);
- if(totSize > sizeLimit) {
+ if (totSize > sizeLimit) {
break;
}
}
@@ -202,104 +193,98 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) {
return totSize;
}
-TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
- return NULL;
-}
+STqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; }
-int tqLaunchQuery(TqGroupHandle* gHandle) {
- return 0;
-}
+int tqLaunchQuery(STqGroupHandle* gHandle) { return 0; }
-int tqSendLaunchQuery(TqGroupHandle* gHandle) {
- return 0;
-}
+int tqSendLaunchQuery(STqGroupHandle* gHandle) { return 0; }
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
- /*return 0;*/
+/*return 0;*/
/*}*/
-int tqPushMsg(STQ* pTq , void* p, int64_t version) {
- //add reference
- //judge and launch new query
+int tqPushMsg(STQ* pTq, void* p, int64_t version) {
+ // add reference
+ // judge and launch new query
return 0;
}
int tqCommit(STQ* pTq) {
- //do nothing
+ // do nothing
return 0;
}
int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
- if(!tqProtoCheck((TmqMsgHead *)pMsg)) {
- //proto version invalid
+ if (!tqProtoCheck((TmqMsgHead*)pMsg)) {
+ // proto version invalid
return -1;
}
- int64_t clientId = pMsg->head.clientId;
- TqGroupHandle *gHandle = tqGetGroupHandle(pTq, clientId);
- if(gHandle == NULL) {
- //client not connect
+ int64_t clientId = pMsg->head.clientId;
+ STqGroupHandle* gHandle = tqGetGroupHandle(pTq, clientId);
+ if (gHandle == NULL) {
+ // client not connect
return -1;
}
- if(pMsg->acks.ackNum != 0) {
- if(tqAck(gHandle, &pMsg->acks) != 0) {
- //ack not success
+ if (pMsg->acks.ackNum != 0) {
+ if (tqAck(gHandle, &pMsg->acks) != 0) {
+ // ack not success
return -1;
}
}
- TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg;
+ TmqConsumeRsp* pRsp = (TmqConsumeRsp*)pMsg;
- if(tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) {
- //fetch error
+ if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) {
+ // fetch error
return -1;
}
- //judge and launch new query
- if(tqLaunchQuery(gHandle)) {
- //launch query error
+ // judge and launch new query
+ if (tqLaunchQuery(gHandle)) {
+ // launch query error
return -1;
}
return 0;
}
-int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHead) {
- //calculate size
- int sz = tqGetgHandleSSize(gHandle) + sizeof(TqSerializedHead);
- if(sz > (*ppHead)->ssize) {
+int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead) {
+ // calculate size
+ int sz = tqGetgHandleSSize(gHandle) + sizeof(STqSerializedHead);
+ if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz);
- if(tmpPtr == NULL) {
+ if (tmpPtr == NULL) {
free(*ppHead);
- //TODO: memory err
+ // TODO: memory err
return -1;
}
*ppHead = tmpPtr;
(*ppHead)->ssize = sz;
}
void* ptr = (*ppHead)->content;
- //do serialization
+ // do serialization
*(int64_t*)ptr = gHandle->cId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = gHandle->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = gHandle->topicNum;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
- if(gHandle->topicNum > 0) {
+ if (gHandle->topicNum > 0) {
tqSerializeListHandle(gHandle->head, ptr);
}
return 0;
}
-void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) {
- TqListHandle *node = listHandle;
+void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr) {
+ STqListHandle* node = listHandle;
ASSERT(node != NULL);
- while(node) {
+ while (node) {
ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
node = node->next;
}
return ptr;
}
-void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
+void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr) {
*(int64_t*)ptr = bufHandle->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId;
@@ -308,21 +293,21 @@ void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
*(int32_t*)ptr = bufHandle->tail;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
- for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
+ for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr);
}
return ptr;
}
-void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
- //TODO: do we need serialize this?
- //mainly for executor
+void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr) {
+ // TODO: do we need serialize this?
+ // mainly for executor
return ptr;
}
-const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle **ppGHandle) {
- TqGroupHandle *gHandle = *ppGHandle;
- const void* ptr = pHead->content;
+const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** ppGHandle) {
+ STqGroupHandle* gHandle = *ppGHandle;
+ const void* ptr = pHead->content;
gHandle->cId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->cgId = *(int64_t*)ptr;
@@ -331,20 +316,20 @@ const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandl
gHandle->topicNum = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
gHandle->head = NULL;
- TqListHandle *node = gHandle->head;
- for(int i = 0; i < gHandle->topicNum; i++) {
- if(gHandle->head == NULL) {
- if((node = malloc(sizeof(TqListHandle))) == NULL) {
- //TODO: error
+ STqListHandle* node = gHandle->head;
+ for (int i = 0; i < gHandle->topicNum; i++) {
+ if (gHandle->head == NULL) {
+ if ((node = malloc(sizeof(STqListHandle))) == NULL) {
+ // TODO: error
return NULL;
}
- node->next= NULL;
- ptr = tqDeserializeBufHandle(ptr, &node->bufHandle);
+ node->next = NULL;
+ ptr = tqDeserializeBufHandle(ptr, &node->bufHandle);
gHandle->head = node;
} else {
- node->next = malloc(sizeof(TqListHandle));
- if(node->next == NULL) {
- //TODO: error
+ node->next = malloc(sizeof(STqListHandle));
+ if (node->next == NULL) {
+ // TODO: error
return NULL;
}
node->next->next = NULL;
@@ -355,7 +340,7 @@ const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandl
return ptr;
}
-const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) {
+const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle) {
const void* ptr = pBytes;
bufHandle->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
@@ -365,32 +350,30 @@ const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
bufHandle->tail = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
- for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
+ for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]);
}
return ptr;
}
-const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
- return pBytes;
-}
+const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem) { return pBytes; }
-//TODO: make this a macro
-int tqGetgHandleSSize(const TqGroupHandle *gHandle) {
- return sizeof(int64_t) * 2 //cId + cgId
- + sizeof(int32_t) //topicNum
- + gHandle->topicNum * tqBufHandleSSize();
+// TODO: make this a macro
+int tqGetgHandleSSize(const STqGroupHandle* gHandle) {
+ return sizeof(int64_t) * 2 // cId + cgId
+ + sizeof(int32_t) // topicNum
+ + gHandle->topicNum * tqBufHandleSSize();
}
-//TODO: make this a macro
+// TODO: make this a macro
int tqBufHandleSSize() {
- return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
- + sizeof(int32_t) * 2 // head + tail
- + TQ_BUFFER_SIZE * tqBufItemSSize();
+ return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
+ + sizeof(int32_t) * 2 // head + tail
+ + TQ_BUFFER_SIZE * tqBufItemSSize();
}
int tqBufItemSSize() {
- //TODO: do this need serialization?
- //mainly for executor
+ // TODO: do this need serialization?
+ // mainly for executor
return 0;
}
diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c
index 3f40c94f244518a76ad42725c8067bb98c54c34d..082f0ad28e30356794d9cc12b69eac2668a6fb9c 100644
--- a/source/dnode/vnode/tq/src/tqMetaStore.c
+++ b/source/dnode/vnode/tq/src/tqMetaStore.c
@@ -13,20 +13,20 @@
* along with this program. If not, see .
*/
#include "tqMetaStore.h"
-//TODO:replace by an abstract file layer
-#include "osDir.h"
+// TODO:replace by an abstract file layer
#include
#include
#include
+#include "osDir.h"
#define TQ_META_NAME "tq.meta"
-#define TQ_IDX_NAME "tq.idx"
+#define TQ_IDX_NAME "tq.idx"
static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value);
static void* tqHandleGetUncommitted(STqMetaStore*, int64_t key);
-static inline void tqLinkUnpersist(STqMetaStore *pMeta, STqMetaList* pNode) {
- if(pNode->unpersistNext == NULL) {
+static inline void tqLinkUnpersist(STqMetaStore* pMeta, STqMetaList* pNode) {
+ if (pNode->unpersistNext == NULL) {
pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
pNode->unpersistPrev = pMeta->unpersistHead;
pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
@@ -41,24 +41,24 @@ static inline int tqSeekLastPage(int fd) {
return lseek(fd, curPageOffset, SEEK_SET);
}
-//TODO: the struct is tightly coupled with index entry
-typedef struct TqIdxPageHead {
+// TODO: the struct is tightly coupled with index entry
+typedef struct STqIdxPageHead {
int16_t writeOffset;
int8_t unused[14];
-} TqIdxPageHead;
+} STqIdxPageHead;
-typedef struct TqIdxPageBuf {
- TqIdxPageHead head;
- char buffer[TQ_IDX_PAGE_BODY_SIZE];
-} TqIdxPageBuf;
+typedef struct STqIdxPageBuf {
+ STqIdxPageHead head;
+ char buffer[TQ_IDX_PAGE_BODY_SIZE];
+} STqIdxPageBuf;
-static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
+static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
int offset = tqSeekLastPage(fd);
int nBytes;
- if((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
+ if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
return -1;
}
- if(nBytes == 0) {
+ if (nBytes == 0) {
memset(pBuf, 0, TQ_PAGE_SIZE);
pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
}
@@ -67,28 +67,24 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
return lseek(fd, offset, SEEK_SET);
}
-STqMetaStore* tqStoreOpen(const char* path,
- TqSerializeFun serializer,
- TqDeserializeFun deserializer,
- TqDeleteFun deleter,
- int32_t tqConfigFlag
- ) {
- STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
- if(pMeta == NULL) {
- //close
+STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserialize deserializer, FTqDelete deleter,
+ int32_t tqConfigFlag) {
+ STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
+ if (pMeta == NULL) {
+ // close
return NULL;
}
memset(pMeta, 0, sizeof(STqMetaStore));
- //concat data file name and index file name
+ // concat data file name and index file name
size_t pathLen = strlen(path);
- pMeta->dirPath = malloc(pathLen+1);
- if(pMeta->dirPath != NULL) {
- //TODO: memory insufficient
+ pMeta->dirPath = malloc(pathLen + 1);
+ if (pMeta->dirPath != NULL) {
+ // TODO: memory insufficient
}
strcpy(pMeta->dirPath, path);
-
- char name[pathLen+10];
+
+ char name[pathLen + 10];
strcpy(name, path);
if (taosDirExist(name) != 0 && taosMkDir(name) != 0) {
@@ -96,98 +92,96 @@ STqMetaStore* tqStoreOpen(const char* path,
}
strcat(name, "/" TQ_IDX_NAME);
int idxFd = open(name, O_RDWR | O_CREAT, 0755);
- if(idxFd < 0) {
+ if (idxFd < 0) {
ASSERT(false);
- //close file
- //free memory
+ // close file
+ // free memory
return NULL;
}
pMeta->idxFd = idxFd;
pMeta->unpersistHead = malloc(sizeof(STqMetaList));
- if(pMeta->unpersistHead == NULL) {
+ if (pMeta->unpersistHead == NULL) {
ASSERT(false);
- //close file
- //free memory
+ // close file
+ // free memory
return NULL;
}
memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
- pMeta->unpersistHead->unpersistNext
- = pMeta->unpersistHead->unpersistPrev
- = pMeta->unpersistHead;
+ pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead;
strcpy(name, path);
strcat(name, "/" TQ_META_NAME);
int fileFd = open(name, O_RDWR | O_CREAT, 0755);
- if(fileFd < 0){
+ if (fileFd < 0) {
ASSERT(false);
return NULL;
}
pMeta->fileFd = fileFd;
-
+
pMeta->pSerializer = serializer;
pMeta->pDeserializer = deserializer;
pMeta->pDeleter = deleter;
pMeta->tqConfigFlag = tqConfigFlag;
- //read idx file and load into memory
- TqIdxPageBuf idxBuf;
- TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
- if(serializedObj == NULL) {
- //TODO:memory insufficient
+ // read idx file and load into memory
+ STqIdxPageBuf idxBuf;
+ STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
+ if (serializedObj == NULL) {
+ // TODO:memory insufficient
}
- int idxRead;
- int allocated = TQ_PAGE_SIZE;
+ int idxRead;
+ int allocated = TQ_PAGE_SIZE;
bool readEnd = false;
- while((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
- if(idxRead == -1) {
- //TODO: handle error
+ while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
+ if (idxRead == -1) {
+ // TODO: handle error
ASSERT(false);
}
ASSERT(idxBuf.head.writeOffset == idxRead);
- //loop read every entry
- for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
- STqMetaList *pNode = malloc(sizeof(STqMetaList));
- if(pNode == NULL) {
- //TODO: free memory and return error
+ // loop read every entry
+ for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
+ STqMetaList* pNode = malloc(sizeof(STqMetaList));
+ if (pNode == NULL) {
+ // TODO: free memory and return error
}
memset(pNode, 0, sizeof(STqMetaList));
memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_SET);
- if(allocated < pNode->handle.serializedSize) {
- void *ptr = realloc(serializedObj, pNode->handle.serializedSize);
- if(ptr == NULL) {
- //TODO: memory insufficient
+ if (allocated < pNode->handle.serializedSize) {
+ void* ptr = realloc(serializedObj, pNode->handle.serializedSize);
+ if (ptr == NULL) {
+ // TODO: memory insufficient
}
serializedObj = ptr;
allocated = pNode->handle.serializedSize;
}
serializedObj->ssize = pNode->handle.serializedSize;
- if(read(fileFd, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) {
- //TODO: read error
+ if (read(fileFd, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) {
+ // TODO: read error
}
- if(serializedObj->action == TQ_ACTION_INUSE) {
- if(serializedObj->ssize != sizeof(TqSerializedHead)) {
+ if (serializedObj->action == TQ_ACTION_INUSE) {
+ if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
- } else if(serializedObj->action == TQ_ACTION_INTXN) {
- if(serializedObj->ssize != sizeof(TqSerializedHead)) {
+ } else if (serializedObj->action == TQ_ACTION_INTXN) {
+ if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
- } else if(serializedObj->action == TQ_ACTION_INUSE_CONT) {
- if(serializedObj->ssize != sizeof(TqSerializedHead)) {
+ } else if (serializedObj->action == TQ_ACTION_INUSE_CONT) {
+ if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
- TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
- if(ptr->ssize != sizeof(TqSerializedHead)) {
+ STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
+ if (ptr->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
@@ -196,22 +190,21 @@ STqMetaStore* tqStoreOpen(const char* path,
ASSERT(0);
}
- //put into list
- int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
+ // put into list
+ int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
STqMetaList* pBucketNode = pMeta->bucket[bucketKey];
- if(pBucketNode == NULL) {
+ if (pBucketNode == NULL) {
pMeta->bucket[bucketKey] = pNode;
- } else if(pBucketNode->handle.key == pNode->handle.key) {
+ } else if (pBucketNode->handle.key == pNode->handle.key) {
pNode->next = pBucketNode->next;
pMeta->bucket[bucketKey] = pNode;
} else {
- while(pBucketNode->next &&
- pBucketNode->next->handle.key != pNode->handle.key) {
- pBucketNode = pBucketNode->next;
+ while (pBucketNode->next && pBucketNode->next->handle.key != pNode->handle.key) {
+ pBucketNode = pBucketNode->next;
}
- if(pBucketNode->next) {
+ if (pBucketNode->next) {
ASSERT(pBucketNode->next->handle.key == pNode->handle.key);
- STqMetaList *pNodeFound = pBucketNode->next;
+ STqMetaList* pNodeFound = pBucketNode->next;
pNode->next = pNodeFound->next;
pBucketNode->next = pNode;
pBucketNode = pNodeFound;
@@ -221,13 +214,11 @@ STqMetaStore* tqStoreOpen(const char* path,
pBucketNode = NULL;
}
}
- if(pBucketNode) {
- if(pBucketNode->handle.valueInUse
- && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
+ if (pBucketNode) {
+ if (pBucketNode->handle.valueInUse && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pBucketNode->handle.valueInUse);
}
- if(pBucketNode->handle.valueInTxn
- && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
+ if (pBucketNode->handle.valueInTxn && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pBucketNode->handle.valueInTxn);
}
free(pBucketNode);
@@ -239,23 +230,21 @@ STqMetaStore* tqStoreOpen(const char* path,
}
int32_t tqStoreClose(STqMetaStore* pMeta) {
- //commit data and idx
+ // commit data and idx
tqStorePersist(pMeta);
- ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next==NULL);
+ ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next == NULL);
close(pMeta->fileFd);
close(pMeta->idxFd);
- //free memory
- for(int i = 0; i < TQ_BUCKET_SIZE; i++) {
+ // free memory
+ for (int i = 0; i < TQ_BUCKET_SIZE; i++) {
STqMetaList* pNode = pMeta->bucket[i];
- while(pNode) {
+ while (pNode) {
ASSERT(pNode->unpersistNext == NULL);
ASSERT(pNode->unpersistPrev == NULL);
- if(pNode->handle.valueInTxn
- && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
+ if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn);
}
- if(pNode->handle.valueInUse
- && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
+ if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse);
}
STqMetaList* next = pNode->next;
@@ -272,17 +261,15 @@ int32_t tqStoreClose(STqMetaStore* pMeta) {
int32_t tqStoreDelete(STqMetaStore* pMeta) {
close(pMeta->fileFd);
close(pMeta->idxFd);
- //free memory
- for(int i = 0; i < TQ_BUCKET_SIZE; i++) {
+ // free memory
+ for (int i = 0; i < TQ_BUCKET_SIZE; i++) {
STqMetaList* pNode = pMeta->bucket[i];
pMeta->bucket[i] = NULL;
- while(pNode) {
- if(pNode->handle.valueInTxn
- && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
+ while (pNode) {
+ if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn);
}
- if(pNode->handle.valueInUse
- && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
+ if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse);
}
STqMetaList* next = pNode->next;
@@ -297,26 +284,26 @@ int32_t tqStoreDelete(STqMetaStore* pMeta) {
return 0;
}
-//TODO: wrap in tfile
+// TODO: wrap in tfile
int32_t tqStorePersist(STqMetaStore* pMeta) {
- TqIdxPageBuf idxBuf;
- int64_t* bufPtr = (int64_t*)idxBuf.buffer;
- STqMetaList *pHead = pMeta->unpersistHead;
- STqMetaList *pNode = pHead->unpersistNext;
- TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead));
- if(pSHead == NULL) {
- //TODO: memory error
+ STqIdxPageBuf idxBuf;
+ int64_t* bufPtr = (int64_t*)idxBuf.buffer;
+ STqMetaList* pHead = pMeta->unpersistHead;
+ STqMetaList* pNode = pHead->unpersistNext;
+ STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead));
+ if (pSHead == NULL) {
+ // TODO: memory error
return -1;
}
pSHead->ver = TQ_SVER;
pSHead->checksum = 0;
- pSHead->ssize = sizeof(TqSerializedHead);
- int allocatedSize = sizeof(TqSerializedHead);
+ pSHead->ssize = sizeof(STqSerializedHead);
+ int allocatedSize = sizeof(STqSerializedHead);
int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
tqReadLastPage(pMeta->idxFd, &idxBuf);
- if(idxBuf.head.writeOffset == TQ_PAGE_SIZE) {
+ if (idxBuf.head.writeOffset == TQ_PAGE_SIZE) {
lseek(pMeta->idxFd, 0, SEEK_END);
memset(&idxBuf, 0, TQ_PAGE_SIZE);
idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
@@ -324,18 +311,18 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset);
}
- while(pHead != pNode) {
+ while (pHead != pNode) {
int nBytes = 0;
- if(pNode->handle.valueInUse) {
- if(pNode->handle.valueInTxn) {
+ if (pNode->handle.valueInUse) {
+ if (pNode->handle.valueInTxn) {
pSHead->action = TQ_ACTION_INUSE_CONT;
} else {
pSHead->action = TQ_ACTION_INUSE;
}
- if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
- pSHead->ssize = sizeof(TqSerializedHead);
+ if (pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
+ pSHead->ssize = sizeof(STqSerializedHead);
} else {
pMeta->pSerializer(pNode->handle.valueInUse, &pSHead);
}
@@ -343,10 +330,10 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
ASSERT(nBytes == pSHead->ssize);
}
- if(pNode->handle.valueInTxn) {
+ if (pNode->handle.valueInTxn) {
pSHead->action = TQ_ACTION_INTXN;
- if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
- pSHead->ssize = sizeof(TqSerializedHead);
+ if (pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
+ pSHead->ssize = sizeof(STqSerializedHead);
} else {
pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead);
}
@@ -357,42 +344,39 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
pNode->handle.offset = offset;
offset += nBytes;
- //write idx file
- //TODO: endian check and convert
+ // write idx file
+ // TODO: endian check and convert
*(bufPtr++) = pNode->handle.key;
*(bufPtr++) = pNode->handle.offset;
*(bufPtr++) = (int64_t)nBytes;
idxBuf.head.writeOffset += TQ_IDX_SIZE;
- if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) {
+ if (idxBuf.head.writeOffset >= TQ_PAGE_SIZE) {
nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE);
- //TODO: handle error with tfile
+ // TODO: handle error with tfile
ASSERT(nBytes == TQ_PAGE_SIZE);
memset(&idxBuf, 0, TQ_PAGE_SIZE);
idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
bufPtr = (int64_t*)&idxBuf.buffer;
}
- //remove from unpersist list
+ // remove from unpersist list
pHead->unpersistNext = pNode->unpersistNext;
pHead->unpersistNext->unpersistPrev = pHead;
pNode->unpersistPrev = pNode->unpersistNext = NULL;
pNode = pHead->unpersistNext;
- //remove from bucket
- if(pNode->handle.valueInUse == TQ_DELETE_TOKEN &&
- pNode->handle.valueInTxn == NULL
- ) {
- int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
+ // remove from bucket
+ if (pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL) {
+ int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
STqMetaList* pBucketHead = pMeta->bucket[bucketKey];
- if(pBucketHead == pNode) {
+ if (pBucketHead == pNode) {
pMeta->bucket[bucketKey] = pNode->next;
} else {
STqMetaList* pBucketNode = pBucketHead;
- while(pBucketNode->next != NULL
- && pBucketNode->next != pNode) {
- pBucketNode = pBucketNode->next;
+ while (pBucketNode->next != NULL && pBucketNode->next != pNode) {
+ pBucketNode = pBucketNode->next;
}
- //impossible for pBucket->next == NULL
+ // impossible for pBucket->next == NULL
ASSERT(pBucketNode->next == pNode);
pBucketNode->next = pNode->next;
}
@@ -400,46 +384,45 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
}
}
- //write left bytes
+ // write left bytes
free(pSHead);
- //TODO: write new version in tfile
- if((char*)bufPtr != idxBuf.buffer) {
+ // TODO: write new version in tfile
+ if ((char*)bufPtr != idxBuf.buffer) {
int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset);
- //TODO: handle error in tfile
+ // TODO: handle error in tfile
ASSERT(nBytes == idxBuf.head.writeOffset);
}
- //TODO: using fsync in tfile
+ // TODO: using fsync in tfile
fsync(pMeta->idxFd);
fsync(pMeta->fileFd);
return 0;
}
static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* value) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.key == key) {
- //TODO: think about thread safety
- if(pNode->handle.valueInUse
- && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
+ while (pNode) {
+ if (pNode->handle.key == key) {
+ // TODO: think about thread safety
+ if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse);
}
- //change pointer ownership
+ // change pointer ownership
pNode->handle.valueInUse = value;
return 0;
} else {
pNode = pNode->next;
}
}
- STqMetaList *pNewNode = malloc(sizeof(STqMetaList));
- if(pNewNode == NULL) {
- //TODO: memory error
+ STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
+ if (pNewNode == NULL) {
+ // TODO: memory error
return -1;
}
memset(pNewNode, 0, sizeof(STqMetaList));
pNewNode->handle.key = key;
pNewNode->handle.valueInUse = value;
- //put into unpersist list
+ // put into unpersist list
pNewNode->unpersistPrev = pMeta->unpersistHead;
pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
pMeta->unpersistHead->unpersistNext->unpersistPrev = pNewNode;
@@ -448,12 +431,11 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
}
void* tqHandleGet(STqMetaStore* pMeta, int64_t key) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.key == key) {
- if(pNode->handle.valueInUse != NULL
- && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
+ while (pNode) {
+ if (pNode->handle.key == key) {
+ if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
return pNode->handle.valueInUse;
} else {
return NULL;
@@ -466,12 +448,11 @@ void* tqHandleGet(STqMetaStore* pMeta, int64_t key) {
}
void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.key == key) {
- if(pNode->handle.valueInUse != NULL
- && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
+ while (pNode) {
+ if (pNode->handle.key == key) {
+ if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
tqLinkUnpersist(pMeta, pNode);
return pNode->handle.valueInUse;
} else {
@@ -485,16 +466,16 @@ void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) {
}
static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* value) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.key == key) {
- //TODO: think about thread safety
- if(pNode->handle.valueInTxn) {
- if(TqDupIntxnReject(pMeta->tqConfigFlag)) {
+ while (pNode) {
+ if (pNode->handle.key == key) {
+ // TODO: think about thread safety
+ if (pNode->handle.valueInTxn) {
+ if (TqDupIntxnReject(pMeta->tqConfigFlag)) {
return -2;
}
- if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
+ if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn);
}
}
@@ -505,9 +486,9 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
pNode = pNode->next;
}
}
- STqMetaList *pNewNode = malloc(sizeof(STqMetaList));
- if(pNewNode == NULL) {
- //TODO: memory error
+ STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
+ if (pNewNode == NULL) {
+ // TODO: memory error
return -1;
}
memset(pNewNode, 0, sizeof(STqMetaList));
@@ -519,14 +500,12 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
return 0;
}
-int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) {
- return tqHandlePutImpl(pMeta, key, value);
-}
+int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return tqHandlePutImpl(pMeta, key, value); }
int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
- void *vmem = malloc(vsize);
- if(vmem == NULL) {
- //TODO: memory error
+ void* vmem = malloc(vsize);
+ if (vmem == NULL) {
+ // TODO: memory error
return -1;
}
memcpy(vmem, value, vsize);
@@ -534,12 +513,11 @@ int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vs
}
static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.key == key) {
- if(pNode->handle.valueInTxn != NULL
- && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
+ while (pNode) {
+ if (pNode->handle.key == key) {
+ if (pNode->handle.valueInTxn != NULL && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
return pNode->handle.valueInTxn;
} else {
return NULL;
@@ -552,15 +530,14 @@ static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) {
}
int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.key == key) {
- if(pNode->handle.valueInTxn == NULL) {
+ while (pNode) {
+ if (pNode->handle.key == key) {
+ if (pNode->handle.valueInTxn == NULL) {
return -1;
}
- if(pNode->handle.valueInUse
- && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
+ if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse);
}
pNode->handle.valueInUse = pNode->handle.valueInTxn;
@@ -575,12 +552,12 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
}
int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.key == key) {
- if(pNode->handle.valueInTxn) {
- if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
+ while (pNode) {
+ if (pNode->handle.key == key) {
+ if (pNode->handle.valueInTxn) {
+ if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn);
}
pNode->handle.valueInTxn = NULL;
@@ -596,13 +573,14 @@ int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
}
int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
- int64_t bucketKey = key & TQ_BUCKET_MASK;
+ int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
- while(pNode) {
- if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
- if(pNode->handle.valueInTxn) {
+ while (pNode) {
+ if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
+ if (pNode->handle.valueInTxn) {
pMeta->pDeleter(pNode->handle.valueInTxn);
}
+
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode);
return 0;
@@ -610,11 +588,9 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
pNode = pNode->next;
}
}
- //no such key
+ // no such key
return -1;
}
-//TODO: clean deleted idx and data from persistent file
-int32_t tqStoreCompact(STqMetaStore *pMeta) {
- return 0;
-}
+// TODO: clean deleted idx and data from persistent file
+int32_t tqStoreCompact(STqMetaStore* pMeta) { return 0; }
diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp
index bbd436ab1a98bb49d3370486fbfc52d1da5be3ee..58263efa71ec77db6a5fbfc973d4f7c9edd55714 100644
--- a/source/dnode/vnode/tq/test/tqMetaTest.cpp
+++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp
@@ -9,17 +9,17 @@ struct Foo {
int32_t a;
};
-int FooSerializer(const void* pObj, TqSerializedHead** ppHead) {
+int FooSerializer(const void* pObj, STqSerializedHead** ppHead) {
Foo* foo = (Foo*) pObj;
- if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(TqSerializedHead) + sizeof(int32_t)) {
- *ppHead = (TqSerializedHead*)realloc(*ppHead, sizeof(TqSerializedHead) + sizeof(int32_t));
- (*ppHead)->ssize = sizeof(TqSerializedHead) + sizeof(int32_t);
+ if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) {
+ *ppHead = (STqSerializedHead*)realloc(*ppHead, sizeof(STqSerializedHead) + sizeof(int32_t));
+ (*ppHead)->ssize = sizeof(STqSerializedHead) + sizeof(int32_t);
}
*(int32_t*)(*ppHead)->content = foo->a;
return (*ppHead)->ssize;
}
-const void* FooDeserializer(const TqSerializedHead* pHead, void** ppObj) {
+const void* FooDeserializer(const STqSerializedHead* pHead, void** ppObj) {
if(*ppObj == NULL) {
*ppObj = realloc(*ppObj, sizeof(int32_t));
}
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index e2715d51f1cab93601613bc7cefb7ce46ec29858..e86a2b622124ba71359ea9305770db41b6d96b18 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -14,27 +14,21 @@
*/
#define _DEFAULT_SOURCE
+#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
-#include "tref.h"
#include "tfile.h"
-#include "cJSON.h"
+#include "tref.h"
#include "walInt.h"
#include
#include
-int64_t inline walGetFirstVer(SWal *pWal) {
- return pWal->vers.firstVer;
-}
+int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
-int64_t inline walGetSnaphostVer(SWal *pWal) {
- return pWal->vers.snapshotVer;
-}
+int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
-int64_t inline walGetLastVer(SWal *pWal) {
- return pWal->vers.lastVer;
-}
+int64_t inline walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
@@ -44,31 +38,30 @@ int walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info
const char* logPattern = "^[0-9]+.log$";
const char* idxPattern = "^[0-9]+.idx$";
- regex_t logRegPattern;
- regex_t idxRegPattern;
- SArray* pLogArray = taosArrayInit(8, sizeof(int64_t));
+ regex_t logRegPattern;
+ regex_t idxRegPattern;
+ SArray* pLogArray = taosArrayInit(8, sizeof(int64_t));
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
-
- DIR *dir = opendir(pWal->path);
- if(dir == NULL) {
+
+ DIR* dir = opendir(pWal->path);
+ if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent* ent;
- while((ent = readdir(dir)) != NULL) {
- char *name = basename(ent->d_name);
- int code = regexec(&logRegPattern, name, 0, NULL, 0);
- if(code == 0) {
+ while ((ent = readdir(dir)) != NULL) {
+ char* name = basename(ent->d_name);
+ int code = regexec(&logRegPattern, name, 0, NULL, 0);
+ if (code == 0) {
int64_t firstVer;
sscanf(name, "%" PRId64 ".log", &firstVer);
taosArrayPush(pLogArray, &firstVer);
}
}
-
// load meta
// if not match, or meta missing
// rebuild meta
@@ -85,15 +78,15 @@ int walRollFileInfo(SWal* pWal) {
int64_t ts = taosGetTimestampSec();
SArray* pArray = pWal->fileInfoSet;
- if(taosArrayGetSize(pArray) != 0) {
- WalFileInfo *pInfo = taosArrayGetLast(pArray);
+ if (taosArrayGetSize(pArray) != 0) {
+ WalFileInfo* pInfo = taosArrayGetLast(pArray);
pInfo->lastVer = pWal->vers.lastVer;
pInfo->closeTs = ts;
}
- //TODO: change to emplace back
- WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo));
- if(pNewInfo == NULL) {
+ // TODO: change to emplace back
+ WalFileInfo* pNewInfo = malloc(sizeof(WalFileInfo));
+ if (pNewInfo == NULL) {
return -1;
}
pNewInfo->firstVer = pWal->vers.lastVer + 1;
@@ -109,13 +102,13 @@ int walRollFileInfo(SWal* pWal) {
char* walMetaSerialize(SWal* pWal) {
char buf[30];
ASSERT(pWal->fileInfoSet);
- int sz = pWal->fileInfoSet->size;
+ int sz = pWal->fileInfoSet->size;
cJSON* pRoot = cJSON_CreateObject();
cJSON* pMeta = cJSON_CreateObject();
cJSON* pFiles = cJSON_CreateArray();
cJSON* pField;
- if(pRoot == NULL || pMeta == NULL || pFiles == NULL) {
- //TODO
+ if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
+ // TODO
return NULL;
}
cJSON_AddItemToObject(pRoot, "meta", pMeta);
@@ -130,15 +123,15 @@ char* walMetaSerialize(SWal* pWal) {
cJSON_AddItemToObject(pRoot, "files", pFiles);
WalFileInfo* pData = pWal->fileInfoSet->pData;
- for(int i = 0; i < sz; i++) {
+ for (int i = 0; i < sz; i++) {
WalFileInfo* pInfo = &pData[i];
cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject());
- if(pField == NULL) {
+ if (pField == NULL) {
cJSON_Delete(pRoot);
return NULL;
}
- //cjson only support int32_t or double
- //string are used to prohibit the loss of precision
+ // cjson only support int32_t or double
+ // string are used to prohibit the loss of precision
sprintf(buf, "%" PRId64, pInfo->firstVer);
cJSON_AddStringToObject(pField, "firstVer", buf);
sprintf(buf, "%" PRId64, pInfo->lastVer);
@@ -171,12 +164,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
pFiles = cJSON_GetObjectItem(pRoot, "files");
int sz = cJSON_GetArraySize(pFiles);
- //deserialize
+ // deserialize
SArray* pArray = pWal->fileInfoSet;
taosArrayEnsureCap(pArray, sz);
- WalFileInfo *pData = pArray->pData;
- for(int i = 0; i < sz; i++) {
- cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
+ WalFileInfo* pData = pArray->pData;
+ for (int i = 0; i < sz; i++) {
+ cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
WalFileInfo* pInfo = &pData[i];
pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
pInfo->firstVer = atoll(cJSON_GetStringValue(pField));
@@ -196,24 +189,24 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
}
static int walFindCurMetaVer(SWal* pWal) {
- const char * pattern = "^meta-ver[0-9]+$";
- regex_t walMetaRegexPattern;
+ const char* pattern = "^meta-ver[0-9]+$";
+ regex_t walMetaRegexPattern;
regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED);
- DIR *dir = opendir(pWal->path);
- if(dir == NULL) {
+ DIR* dir = opendir(pWal->path);
+ if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent* ent;
- //find existing meta-ver[x].json
+ // find existing meta-ver[x].json
int metaVer = -1;
- while((ent = readdir(dir)) != NULL) {
- char *name = basename(ent->d_name);
- int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
- if(code == 0) {
+ while ((ent = readdir(dir)) != NULL) {
+ char* name = basename(ent->d_name);
+ int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
+ if (code == 0) {
sscanf(name, "meta-ver%d", &metaVer);
break;
}
@@ -224,23 +217,23 @@ static int walFindCurMetaVer(SWal* pWal) {
}
int walSaveMeta(SWal* pWal) {
- int metaVer = walFindCurMetaVer(pWal);
+ int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN];
- walBuildMetaName(pWal, metaVer+1, fnameStr);
+ walBuildMetaName(pWal, metaVer + 1, fnameStr);
int metaTfd = tfOpenCreateWrite(fnameStr);
- if(metaTfd < 0) {
+ if (metaTfd < 0) {
return -1;
}
char* serialized = walMetaSerialize(pWal);
- int len = strlen(serialized);
- if(len != tfWrite(metaTfd, serialized, len)) {
- //TODO:clean file
+ int len = strlen(serialized);
+ if (len != tfWrite(metaTfd, serialized, len)) {
+ // TODO:clean file
return -1;
}
-
+
tfClose(metaTfd);
- //delete old file
- if(metaVer > -1) {
+ // delete old file
+ if (metaVer > -1) {
walBuildMetaName(pWal, metaVer, fnameStr);
remove(fnameStr);
}
@@ -250,30 +243,30 @@ int walSaveMeta(SWal* pWal) {
int walLoadMeta(SWal* pWal) {
ASSERT(pWal->fileInfoSet->size == 0);
- //find existing meta file
+ // find existing meta file
int metaVer = walFindCurMetaVer(pWal);
- if(metaVer == -1) {
+ if (metaVer == -1) {
return 0;
}
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr);
- //read metafile
+ // read metafile
struct stat statbuf;
stat(fnameStr, &statbuf);
- int size = statbuf.st_size;
+ int size = statbuf.st_size;
char* buf = malloc(size + 5);
- if(buf == NULL) {
+ if (buf == NULL) {
return -1;
}
- memset(buf, 0, size+5);
+ memset(buf, 0, size + 5);
int tfd = tfOpenRead(fnameStr);
- if(tfRead(tfd, buf, size) != size) {
+ if (tfRead(tfd, buf, size) != size) {
free(buf);
return -1;
}
- //load into fileInfoSet
+ // load into fileInfoSet
int code = walMetaDeserialize(pWal, buf);
- if(code != 0) {
+ if (code != 0) {
free(buf);
return -1;
}
diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c
index 06df3eb3be9360b407eb749a97c6242ca3074e96..5b6a8c6b2907b9c1e1c63ca0c117ca338e342286 100644
--- a/source/libs/wal/src/walMgmt.c
+++ b/source/libs/wal/src/walMgmt.c
@@ -14,11 +14,11 @@
*/
#define _DEFAULT_SOURCE
+#include "compare.h"
#include "os.h"
#include "taoserror.h"
-#include "tref.h"
#include "tfile.h"
-#include "compare.h"
+#include "tref.h"
#include "walInt.h"
typedef struct {
@@ -34,16 +34,14 @@ static int32_t walCreateThread();
static void walStopThread();
static void walFreeObj(void *pWal);
-int64_t walGetSeq() {
- return (int64_t)atomic_load_32(&tsWal.seq);
-}
+int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); }
int32_t walInit() {
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
- if(old == 1) return 0;
+ if (old == 1) return 0;
int code = tfInit();
- if(code != 0) {
+ if (code != 0) {
wError("failed to init tfile since %s", tstrerror(code));
atomic_store_8(&tsWal.inited, 0);
return code;
@@ -63,7 +61,7 @@ int32_t walInit() {
void walCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
- if(old == 0) {
+ if (old == 0) {
return;
}
walStopThread();
@@ -78,52 +76,52 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL;
}
- //set config
+ // set config
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
- if(pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
+ if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
tstrncpy(pWal->path, path, sizeof(pWal->path));
- if(taosMkDir(pWal->path) != 0) {
+ if (taosMkDir(pWal->path) != 0) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return NULL;
}
- //open meta
+ // open meta
walResetVer(&pWal->vers);
pWal->writeLogTfd = -1;
pWal->writeIdxTfd = -1;
pWal->writeCur = -1;
pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
- if(pWal->fileInfoSet == NULL) {
+ if (pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
free(pWal);
return NULL;
}
- //init status
+ // init status
pWal->totSize = 0;
pWal->lastRollSeq = -1;
- //init write buffer
+ // init write buffer
memset(&pWal->writeHead, 0, sizeof(SWalHead));
pWal->writeHead.head.headVer = WAL_HEAD_VER;
- if(pthread_mutex_init(&pWal->mutex, NULL) < 0) {
+ if (pthread_mutex_init(&pWal->mutex, NULL) < 0) {
taosArrayDestroy(pWal->fileInfoSet);
free(pWal);
return NULL;
}
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
- if(pWal->refId < 0) {
+ if (pWal->refId < 0) {
pthread_mutex_destroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
free(pWal);
return NULL;
}
- if(walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) {
+ if (walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) {
taosRemoveRef(tsWal.refSetId, pWal->refId);
pthread_mutex_destroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
@@ -131,11 +129,11 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL;
}
- if(walCheckAndRepairIdx(pWal) < 0) {
-
+ if (walCheckAndRepairIdx(pWal) < 0) {
}
- wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
+ wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
+ pWal->cfg.fsyncPeriod);
return pWal;
}
@@ -203,10 +201,12 @@ static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
while (pWal) {
if (walNeedFsync(pWal)) {
- wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
+ wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
+ atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->writeLogTfd);
if (code != 0) {
- wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code));
+ wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
+ strerror(code));
}
}
pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c
index 42fcb8c375802f468f7e6061971d99bcd884d16d..b6e232fa5cd72a96b731762dcecf945ad8cd583f 100644
--- a/source/libs/wal/src/walRead.c
+++ b/source/libs/wal/src/walRead.c
@@ -13,12 +13,12 @@
* along with this program. If not, see .
*/
-#include "walInt.h"
#include "tfile.h"
+#include "walInt.h"
-SWalReadHandle* walOpenReadHandle(SWal* pWal) {
+SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
- if(pRead == NULL) {
+ if (pRead == NULL) {
return NULL;
}
pRead->pWal = pWal;
@@ -29,7 +29,7 @@ SWalReadHandle* walOpenReadHandle(SWal* pWal) {
pRead->capacity = 0;
pRead->status = 0;
pRead->pHead = malloc(sizeof(SWalHead));
- if(pRead->pHead == NULL) {
+ if (pRead->pHead == NULL) {
free(pRead);
return NULL;
}
@@ -43,27 +43,25 @@ void walCloseReadHandle(SWalReadHandle *pRead) {
free(pRead);
}
-int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
- return 0;
-}
+int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { return 0; }
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int code = 0;
int64_t idxTfd = pRead->readIdxTfd;
int64_t logTfd = pRead->readLogTfd;
-
- //seek position
+
+ // seek position
int64_t offset = (ver - fileFirstVer) * sizeof(WalIdxEntry);
code = tfLseek(idxTfd, offset, SEEK_SET);
- if(code < 0) {
+ if (code < 0) {
return -1;
}
WalIdxEntry entry;
- if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
+ if (tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
return -1;
}
- //TODO:deserialize
+ // TODO:deserialize
ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code < 0) {
@@ -80,13 +78,13 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
int64_t logTfd = tfOpenRead(fnameStr);
- if(logTfd < 0) {
+ if (logTfd < 0) {
return -1;
}
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
int64_t idxTfd = tfOpenRead(fnameStr);
- if(idxTfd < 0) {
+ if (idxTfd < 0) {
return -1;
}
@@ -96,76 +94,75 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
}
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
- int code;
+ int code;
SWal *pWal = pRead->pWal;
- if(ver == pRead->curVersion) {
+ if (ver == pRead->curVersion) {
return 0;
}
- if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
+ if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
return -1;
}
- if(ver < pWal->vers.snapshotVer) {
-
+ if (ver < pWal->vers.snapshotVer) {
}
WalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
- //bsearch in fileSet
- WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
+ // bsearch in fileSet
+ WalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
- if(pRead->curFileFirstVer != pRet->firstVer) {
+ if (pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer);
- if(code < 0) {
- //TODO: set error flag
+ if (code < 0) {
+ // TODO: set error flag
return -1;
}
}
code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
- if(code < 0) {
+ if (code < 0) {
return -1;
}
pRead->curVersion = ver;
-
+
return 0;
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
- //TODO: check wal life
- if(pRead->curVersion != ver) {
- code = walReadSeekVer(pRead, ver);
- if(code != 0) {
+ // TODO: check wal life
+ if (pRead->curVersion != ver) {
+ code = walReadSeekVer(pRead, ver);
+ if (code != 0) {
return -1;
}
}
- if(!tfValid(pRead->readLogTfd)) return -1;
+ if (!tfValid(pRead->readLogTfd)) return -1;
code = tfRead(pRead->readLogTfd, pRead->pHead, sizeof(SWalHead));
- if(code != sizeof(SWalHead)) {
+ if (code != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(pRead->pHead);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
- if(pRead->capacity < pRead->pHead->head.len) {
- void* ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
- if(ptr == NULL) {
+ if (pRead->capacity < pRead->pHead->head.len) {
+ void *ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
+ if (ptr == NULL) {
return -1;
}
pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.len;
}
- if(pRead->pHead->head.len != tfRead(pRead->readLogTfd, pRead->pHead->head.body, pRead->pHead->head.len)) {
+ if (pRead->pHead->head.len != tfRead(pRead->readLogTfd, pRead->pHead->head.body, pRead->pHead->head.len)) {
return -1;
}
/*code = walValidBodyCksum(pRead->pHead);*/
ASSERT(pRead->pHead->head.version == ver);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
pRead->curVersion++;
@@ -176,40 +173,40 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
- if(code != 0) {
+ if (code != 0) {
return code;
}
- if(*ppHead == NULL) {
- void* ptr = realloc(*ppHead, sizeof(SWalHead));
- if(ptr == NULL) {
+ if (*ppHead == NULL) {
+ void *ptr = realloc(*ppHead, sizeof(SWalHead));
+ if (ptr == NULL) {
return -1;
}
*ppHead = ptr;
}
- if(tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
+ if (tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
- //TODO: endian compatibility processing after read
- if(walValidHeadCksum(*ppHead) != 0) {
+ // TODO: endian compatibility processing after read
+ if (walValidHeadCksum(*ppHead) != 0) {
return -1;
}
- void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
- if(ptr == NULL) {
+ void *ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
+ if (ptr == NULL) {
free(*ppHead);
*ppHead = NULL;
return -1;
}
- if(tfRead(pWal->writeLogTfd, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
+ if (tfRead(pWal->writeLogTfd, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1;
}
- //TODO: endian compatibility processing after read
- if(walValidBodyCksum(*ppHead) != 0) {
+ // TODO: endian compatibility processing after read
+ if (walValidBodyCksum(*ppHead) != 0) {
return -1;
}
-
+
return 0;
}
/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/
- /*return 0;*/
+/*return 0;*/
/*}*/
diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c
index 769b2fd2e4e48126c5bdd0426b93ab5404afacab..82c596d22521fa073a928a4e31b4acf1b002a643 100644
--- a/source/libs/wal/src/walSeek.c
+++ b/source/libs/wal/src/walSeek.c
@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
-#include "tref.h"
#include "tfile.h"
+#include "tref.h"
#include "walInt.h"
static int walSeekFilePos(SWal* pWal, int64_t ver) {
@@ -25,17 +25,17 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t idxTfd = pWal->writeIdxTfd;
int64_t logTfd = pWal->writeLogTfd;
-
- //seek position
+
+ // seek position
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = tfLseek(idxTfd, idxOff, SEEK_SET);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
WalIdxEntry entry;
- //TODO:deserialize
+ // TODO:deserialize
code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry));
- if(code != 0) {
+ if (code != 0) {
return -1;
}
ASSERT(entry.ver == ver);
@@ -46,8 +46,8 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
return code;
}
-int walChangeFileToLast(SWal *pWal) {
- int64_t idxTfd, logTfd;
+int walChangeFileToLast(SWal* pWal) {
+ int64_t idxTfd, logTfd;
WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
ASSERT(pRet != NULL);
int64_t fileFirstVer = pRet->firstVer;
@@ -55,42 +55,42 @@ int walChangeFileToLast(SWal *pWal) {
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr);
- if(idxTfd < 0) {
+ if (idxTfd < 0) {
return -1;
}
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr);
- if(logTfd < 0) {
+ if (logTfd < 0) {
return -1;
}
- //switch file
+ // switch file
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
return 0;
}
-int walChangeFile(SWal *pWal, int64_t ver) {
- int code = 0;
+int walChangeFile(SWal* pWal, int64_t ver) {
+ int code = 0;
int64_t idxTfd, logTfd;
- char fnameStr[WAL_FILE_LEN];
+ char fnameStr[WAL_FILE_LEN];
code = tfClose(pWal->writeLogTfd);
- if(code != 0) {
- //TODO
+ if (code != 0) {
+ // TODO
return -1;
}
code = tfClose(pWal->writeIdxTfd);
- if(code != 0) {
- //TODO
+ if (code != 0) {
+ // TODO
return -1;
}
WalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
- //bsearch in fileSet
+ // bsearch in fileSet
WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
int64_t fileFirstVer = pRet->firstVer;
- //closed
- if(taosArrayGetLast(pWal->fileInfoSet) != pRet) {
+ // closed
+ if (taosArrayGetLast(pWal->fileInfoSet) != pRet) {
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenRead(fnameStr);
walBuildLogName(pWal, fileFirstVer, fnameStr);
@@ -107,27 +107,26 @@ int walChangeFile(SWal *pWal, int64_t ver) {
return fileFirstVer;
}
-int walSeekVer(SWal *pWal, int64_t ver) {
+int walSeekVer(SWal* pWal, int64_t ver) {
int code;
- if(ver == pWal->vers.lastVer) {
+ if (ver == pWal->vers.lastVer) {
return 0;
}
- if(ver > pWal->vers.lastVer|| ver < pWal->vers.firstVer) {
+ if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
return -1;
}
- if(ver < pWal->vers.snapshotVer) {
-
+ if (ver < pWal->vers.snapshotVer) {
}
- if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
+ if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeFile(pWal, ver);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
}
code = walSeekFilePos(pWal, ver);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
-
+
return 0;
}
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 67b70f249d0184b60acadc551569df9adceb426d..c3a7ca5f4d888520cbfe83f8a38a965f2c57aa58 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -24,7 +24,7 @@
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
- if(ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
+ if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
return -1;
}
pWal->vers.commitVer = ver;
@@ -32,57 +32,57 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
}
int32_t walRollback(SWal *pWal, int64_t ver) {
- int code;
+ int code;
char fnameStr[WAL_FILE_LEN];
- if(ver == pWal->vers.lastVer) {
+ if (ver == pWal->vers.lastVer) {
return 0;
}
- if(ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
+ if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
return -1;
}
pthread_mutex_lock(&pWal->mutex);
- //find correct file
- if(ver < walGetLastFileFirstVer(pWal)) {
- //close current files
+ // find correct file
+ if (ver < walGetLastFileFirstVer(pWal)) {
+ // close current files
tfClose(pWal->writeIdxTfd);
tfClose(pWal->writeLogTfd);
- //open old files
+ // open old files
code = walChangeFile(pWal, ver);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
- //delete files
+ // delete files
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
- for(int i = pWal->writeCur; i < fileSetSize; i++) {
- walBuildLogName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
+ for (int i = pWal->writeCur; i < fileSetSize; i++) {
+ walBuildLogName(pWal, ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
remove(fnameStr);
- walBuildIdxName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
+ walBuildIdxName(pWal, ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
remove(fnameStr);
}
- //pop from fileInfoSet
+ // pop from fileInfoSet
taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1);
}
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
int64_t idxTfd = tfOpenReadWrite(fnameStr);
- //TODO:change to deserialize function
- if(idxTfd < 0) {
+ // TODO:change to deserialize function
+ if (idxTfd < 0) {
pthread_mutex_unlock(&pWal->mutex);
return -1;
}
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = tfLseek(idxTfd, idxOff, SEEK_SET);
- if(code < 0) {
+ if (code < 0) {
pthread_mutex_unlock(&pWal->mutex);
return -1;
}
- //read idx file and get log file pos
- //TODO:change to deserialize function
+ // read idx file and get log file pos
+ // TODO:change to deserialize function
WalIdxEntry entry;
- if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
+ if (tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
pthread_mutex_unlock(&pWal->mutex);
return -1;
}
@@ -90,56 +90,56 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
int64_t logTfd = tfOpenReadWrite(fnameStr);
- if(logTfd < 0) {
- //TODO
+ if (logTfd < 0) {
+ // TODO
pthread_mutex_unlock(&pWal->mutex);
return -1;
}
code = tfLseek(logTfd, entry.offset, SEEK_SET);
- if(code < 0) {
- //TODO
+ if (code < 0) {
+ // TODO
pthread_mutex_unlock(&pWal->mutex);
return -1;
}
- //validate offset
+ // validate offset
SWalHead head;
ASSERT(tfValid(logTfd));
int size = tfRead(logTfd, &head, sizeof(SWalHead));
- if(size != sizeof(SWalHead)) {
+ if (size != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(&head);
ASSERT(code == 0);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
- if(head.head.version != ver) {
- //TODO
+ if (head.head.version != ver) {
+ // TODO
return -1;
}
- //truncate old files
+ // truncate old files
code = tfFtruncate(logTfd, entry.offset);
- if(code < 0) {
+ if (code < 0) {
return -1;
}
code = tfFtruncate(idxTfd, idxOff);
- if(code < 0) {
+ if (code < 0) {
return -1;
}
pWal->vers.lastVer = ver - 1;
- ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
- ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
+ ((WalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
+ ((WalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
- //unlock
+ // unlock
pthread_mutex_unlock(&pWal->mutex);
return 0;
}
-int32_t walBeginSnapshot(SWal* pWal, int64_t ver) {
+int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver;
- //check file rolling
- if(pWal->cfg.retentionPeriod == 0) {
+ // check file rolling
+ if (pWal->cfg.retentionPeriod == 0) {
walRoll(pWal);
}
@@ -148,54 +148,54 @@ int32_t walBeginSnapshot(SWal* pWal, int64_t ver) {
int32_t walEndSnapshot(SWal *pWal) {
int64_t ver = pWal->vers.verInSnapshotting;
- if(ver == -1) return -1;
+ if (ver == -1) return -1;
pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec();
- int deleteCnt = 0;
- int64_t newTotSize = pWal->totSize;
+ int deleteCnt = 0;
+ int64_t newTotSize = pWal->totSize;
WalFileInfo tmp;
tmp.firstVer = ver;
- //find files safe to delete
- WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
- if(ver >= pInfo->lastVer) {
+ // find files safe to delete
+ WalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
+ if (ver >= pInfo->lastVer) {
pInfo++;
}
- //iterate files, until the searched result
- for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
- if(pWal->totSize > pWal->cfg.retentionSize ||
- iter->closeTs + pWal->cfg.retentionPeriod > ts) {
- //delete according to file size or close time
+ // iterate files, until the searched result
+ for (WalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
+ if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) {
+ // delete according to file size or close time
deleteCnt++;
newTotSize -= iter->fileSize;
}
}
char fnameStr[WAL_FILE_LEN];
- //remove file
- for(int i = 0; i < deleteCnt; i++) {
- WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
- walBuildLogName(pWal, pInfo->firstVer, fnameStr);
+ // remove file
+ for (int i = 0; i < deleteCnt; i++) {
+ WalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i);
+ walBuildLogName(pWal, pInfo->firstVer, fnameStr);
remove(fnameStr);
- walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
+ walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
remove(fnameStr);
}
- //make new array, remove files
- taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
- if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
+ // make new array, remove files
+ taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
+ if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1;
pWal->vers.firstVer = -1;
} else {
- pWal->vers.firstVer = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
+ pWal->vers.firstVer = ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
}
- pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;;
+ pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
+ ;
pWal->totSize = newTotSize;
pWal->vers.verInSnapshotting = -1;
- //save snapshot ver, commit ver
+ // save snapshot ver, commit ver
int code = walSaveMeta(pWal);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
@@ -204,41 +204,41 @@ int32_t walEndSnapshot(SWal *pWal) {
int walRoll(SWal *pWal) {
int code = 0;
- if(pWal->writeIdxTfd != -1) {
+ if (pWal->writeIdxTfd != -1) {
code = tfClose(pWal->writeIdxTfd);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
}
- if(pWal->writeLogTfd != -1) {
+ if (pWal->writeLogTfd != -1) {
code = tfClose(pWal->writeLogTfd);
- if(code != 0) {
+ if (code != 0) {
return -1;
}
}
int64_t idxTfd, logTfd;
- //create new file
+ // create new file
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
- char fnameStr[WAL_FILE_LEN];
+ char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
idxTfd = tfOpenCreateWriteAppend(fnameStr);
- if(idxTfd < 0) {
+ if (idxTfd < 0) {
ASSERT(0);
return -1;
}
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
logTfd = tfOpenCreateWriteAppend(fnameStr);
- if(logTfd < 0) {
+ if (logTfd < 0) {
ASSERT(0);
return -1;
}
code = walRollFileInfo(pWal);
- if(code != 0) {
+ if (code != 0) {
ASSERT(0);
return -1;
}
- //switch file
+ // switch file
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
@@ -248,10 +248,10 @@ int walRoll(SWal *pWal) {
}
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
- WalIdxEntry entry = { .ver = ver, .offset = offset };
- int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
- if(size != sizeof(WalIdxEntry)) {
- //TODO truncate
+ WalIdxEntry entry = {.ver = ver, .offset = offset};
+ int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
+ if (size != sizeof(WalIdxEntry)) {
+ // TODO truncate
return -1;
}
return 0;
@@ -265,21 +265,21 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (index == pWal->vers.lastVer + 1) {
- if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
+ if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->vers.firstVer = index;
code = walRoll(pWal);
ASSERT(code == 0);
} else {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
- if(pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
+ if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
walRoll(pWal);
- } else if(pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
+ } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
walRoll(pWal);
}
}
} else {
- //reject skip log or rewrite log
- //must truncate explicitly first
+ // reject skip log or rewrite log
+ // must truncate explicitly first
return -1;
}
/*if (!tfValid(pWal->writeLogTfd)) return -1;*/
@@ -294,28 +294,30 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
- //ftruncate
+ // ftruncate
code = TAOS_SYSTEM_ERROR(errno);
- wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
+ wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
+ strerror(errno));
}
- if (tfWrite(pWal->writeLogTfd, (char*)body, bodyLen) != bodyLen) {
- //ftruncate
+ if (tfWrite(pWal->writeLogTfd, (char *)body, bodyLen) != bodyLen) {
+ // ftruncate
code = TAOS_SYSTEM_ERROR(errno);
- wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
+ wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
+ strerror(errno));
}
code = walWriteIndex(pWal, index, offset);
- if(code != 0) {
- //TODO
+ if (code != 0) {
+ // TODO
return -1;
}
- //set status
+ // set status
pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalHead) + bodyLen;
walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
-
+
pthread_mutex_unlock(&pWal->mutex);
return code;
@@ -323,33 +325,34 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
- wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
+ wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->writeLogTfd) < 0) {
- wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno));
+ wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
+ strerror(errno));
}
}
}
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
- /*int code = 0;*/
- /*SWalHead *pHead = NULL;*/
- /*code = (int)walRead(pWal, &pHead, ver);*/
- /*if(pHead->head.version != ver) {*/
- /*return -1;*/
- /*}*/
- /*return 0;*/
+/*int code = 0;*/
+/*SWalHead *pHead = NULL;*/
+/*code = (int)walRead(pWal, &pHead, ver);*/
+/*if(pHead->head.version != ver) {*/
+/*return -1;*/
+/*}*/
+/*return 0;*/
/*}*/
/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/
- /*int code = walSeekVer(pWal, ver);*/
- /*if(code != 0) {*/
- /*return -1;*/
- /*}*/
+/*int code = walSeekVer(pWal, ver);*/
+/*if(code != 0) {*/
+/*return -1;*/
+/*}*/
- /*code = walValidateOffset(pWal, ver);*/
- /*if(code != 0) {*/
- /*return -1;*/
- /*}*/
+/*code = walValidateOffset(pWal, ver);*/
+/*if(code != 0) {*/
+/*return -1;*/
+/*}*/
- /*return 0;*/
+/*return 0;*/
/*}*/
diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp
index d06388201eb04be5894d48fb85a8ec16215ce392..cd082a3a4348290ff70d0274891e32cc9ea29202 100644
--- a/source/libs/wal/test/walMetaTest.cpp
+++ b/source/libs/wal/test/walMetaTest.cpp
@@ -6,111 +6,105 @@
#include "walInt.h"
const char* ranStr = "tvapq02tcp";
-const int ranStrLen = strlen(ranStr);
+const int ranStrLen = strlen(ranStr);
class WalCleanEnv : public ::testing::Test {
- protected:
- static void SetUpTestCase() {
- int code = walInit();
- ASSERT(code == 0);
- }
-
- static void TearDownTestCase() {
- walCleanUp();
- }
+ protected:
+ static void SetUpTestCase() {
+ int code = walInit();
+ ASSERT(code == 0);
+ }
- void SetUp() override {
- taosRemoveDir(pathName);
- SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
- memset(pCfg, 0, sizeof(SWalCfg));
- pCfg->rollPeriod = -1;
- pCfg->segSize = -1;
- pCfg->retentionPeriod = 0;
- pCfg->retentionSize = 0;
- pCfg->level = TAOS_WAL_FSYNC;
- pWal = walOpen(pathName, pCfg);
- free(pCfg);
- ASSERT(pWal != NULL);
- }
+ static void TearDownTestCase() { walCleanUp(); }
+
+ void SetUp() override {
+ taosRemoveDir(pathName);
+ SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
+ memset(pCfg, 0, sizeof(SWalCfg));
+ pCfg->rollPeriod = -1;
+ pCfg->segSize = -1;
+ pCfg->retentionPeriod = 0;
+ pCfg->retentionSize = 0;
+ pCfg->level = TAOS_WAL_FSYNC;
+ pWal = walOpen(pathName, pCfg);
+ free(pCfg);
+ ASSERT(pWal != NULL);
+ }
- void TearDown() override {
- walClose(pWal);
- pWal = NULL;
- }
+ void TearDown() override {
+ walClose(pWal);
+ pWal = NULL;
+ }
- SWal* pWal = NULL;
- const char* pathName = "/tmp/wal_test";
+ SWal* pWal = NULL;
+ const char* pathName = "/tmp/wal_test";
};
class WalCleanDeleteEnv : public ::testing::Test {
- protected:
- static void SetUpTestCase() {
- int code = walInit();
- ASSERT(code == 0);
- }
-
- static void TearDownTestCase() {
- walCleanUp();
- }
+ protected:
+ static void SetUpTestCase() {
+ int code = walInit();
+ ASSERT(code == 0);
+ }
- void SetUp() override {
- taosRemoveDir(pathName);
- SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
- memset(pCfg, 0, sizeof(SWalCfg));
- pCfg->retentionPeriod = 0;
- pCfg->retentionSize = 0;
- pCfg->level = TAOS_WAL_FSYNC;
- pWal = walOpen(pathName, pCfg);
- free(pCfg);
- ASSERT(pWal != NULL);
- }
+ static void TearDownTestCase() { walCleanUp(); }
+
+ void SetUp() override {
+ taosRemoveDir(pathName);
+ SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
+ memset(pCfg, 0, sizeof(SWalCfg));
+ pCfg->retentionPeriod = 0;
+ pCfg->retentionSize = 0;
+ pCfg->level = TAOS_WAL_FSYNC;
+ pWal = walOpen(pathName, pCfg);
+ free(pCfg);
+ ASSERT(pWal != NULL);
+ }
- void TearDown() override {
- walClose(pWal);
- pWal = NULL;
- }
+ void TearDown() override {
+ walClose(pWal);
+ pWal = NULL;
+ }
- SWal* pWal = NULL;
- const char* pathName = "/tmp/wal_test";
+ SWal* pWal = NULL;
+ const char* pathName = "/tmp/wal_test";
};
class WalKeepEnv : public ::testing::Test {
- protected:
- static void SetUpTestCase() {
- int code = walInit();
- ASSERT(code == 0);
- }
+ protected:
+ static void SetUpTestCase() {
+ int code = walInit();
+ ASSERT(code == 0);
+ }
- static void TearDownTestCase() {
- walCleanUp();
- }
+ static void TearDownTestCase() { walCleanUp(); }
- void walResetEnv() {
- TearDown();
- taosRemoveDir(pathName);
- SetUp();
- }
+ void walResetEnv() {
+ TearDown();
+ taosRemoveDir(pathName);
+ SetUp();
+ }
- void SetUp() override {
- SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
- memset(pCfg, 0, sizeof(SWalCfg));
- pCfg->rollPeriod = -1;
- pCfg->segSize = -1;
- pCfg->retentionPeriod = 0;
- pCfg->retentionSize = 0;
- pCfg->level = TAOS_WAL_FSYNC;
- pWal = walOpen(pathName, pCfg);
- free(pCfg);
- ASSERT(pWal != NULL);
- }
+ void SetUp() override {
+ SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
+ memset(pCfg, 0, sizeof(SWalCfg));
+ pCfg->rollPeriod = -1;
+ pCfg->segSize = -1;
+ pCfg->retentionPeriod = 0;
+ pCfg->retentionSize = 0;
+ pCfg->level = TAOS_WAL_FSYNC;
+ pWal = walOpen(pathName, pCfg);
+ free(pCfg);
+ ASSERT(pWal != NULL);
+ }
- void TearDown() override {
- walClose(pWal);
- pWal = NULL;
- }
+ void TearDown() override {
+ walClose(pWal);
+ pWal = NULL;
+ }
- SWal* pWal = NULL;
- const char* pathName = "/tmp/wal_test";
+ SWal* pWal = NULL;
+ const char* pathName = "/tmp/wal_test";
};
TEST_F(WalCleanEnv, createNew) {
@@ -139,7 +133,7 @@ TEST_F(WalCleanEnv, serialize) {
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
- char*ss = walMetaSerialize(pWal);
+ char* ss = walMetaSerialize(pWal);
printf("%s\n", ss);
free(ss);
code = walSaveMeta(pWal);
@@ -162,11 +156,11 @@ TEST_F(WalKeepEnv, readOldMeta) {
walResetEnv();
int code;
- for(int i = 0; i < 10; i++) {
- code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
+ for (int i = 0; i < 10; i++) {
+ code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
- code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
+ code = walWrite(pWal, i + 2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->vers.lastVer, i);
}
@@ -182,7 +176,7 @@ TEST_F(WalKeepEnv, readOldMeta) {
int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
- for(int i = 0; i < len; i++) {
+ for (int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
}
free(oldss);
@@ -191,11 +185,11 @@ TEST_F(WalKeepEnv, readOldMeta) {
TEST_F(WalCleanEnv, write) {
int code;
- for(int i = 0; i < 10; i++) {
- code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
+ for (int i = 0; i < 10; i++) {
+ code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
- code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
+ code = walWrite(pWal, i + 2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->vers.lastVer, i);
}
@@ -205,8 +199,8 @@ TEST_F(WalCleanEnv, write) {
TEST_F(WalCleanEnv, rollback) {
int code;
- for(int i = 0; i < 10; i++) {
- code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
+ for (int i = 0; i < 10; i++) {
+ code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
}
@@ -223,7 +217,7 @@ TEST_F(WalCleanEnv, rollback) {
TEST_F(WalCleanDeleteEnv, roll) {
int code;
int i;
- for(i = 0; i < 100; i++) {
+ for (i = 0; i < 100; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
@@ -231,16 +225,16 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.commitVer, i);
}
- walBeginSnapshot(pWal, i-1);
- ASSERT_EQ(pWal->vers.verInSnapshotting, i-1);
+ walBeginSnapshot(pWal, i - 1);
+ ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
walEndSnapshot(pWal);
- ASSERT_EQ(pWal->vers.snapshotVer, i-1);
+ ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
code = walWrite(pWal, 5, 0, (void*)ranStr, ranStrLen);
ASSERT_NE(code, 0);
- for(; i < 200; i++) {
+ for (; i < 200; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
code = walCommit(pWal, i);
@@ -255,36 +249,36 @@ TEST_F(WalCleanDeleteEnv, roll) {
TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv();
- int code;
+ int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal);
ASSERT(pRead != NULL);
- int i ;
- for(i = 0; i < 100; i++) {
+ int i;
+ for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len);
ASSERT_EQ(code, 0);
}
- for(int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 1000; i++) {
int ver = rand() % 100;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
- //printf("rrbody: \n");
- //for(int i = 0; i < pRead->pHead->head.len; i++) {
- //printf("%d ", pRead->pHead->head.body[i]);
+ // printf("rrbody: \n");
+ // for(int i = 0; i < pRead->pHead->head.len; i++) {
+ // printf("%d ", pRead->pHead->head.body[i]);
//}
- //printf("\n");
+ // printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
- ASSERT_EQ(pRead->curVersion, ver+1);
+ ASSERT_EQ(pRead->curVersion, ver + 1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len);
- for(int j = 0; j < len; j++) {
+ for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}