未验证 提交 29eb9432 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #8761 from taosdata/feature/tq

add walhandle for integration
...@@ -109,11 +109,10 @@ typedef struct TqTopicVhandle { ...@@ -109,11 +109,10 @@ typedef struct TqTopicVhandle {
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 8
// TODO: define a serializer and deserializer
typedef struct TqBufferItem { typedef struct TqBufferItem {
int64_t offset; int64_t offset;
// executors are identical but not concurrent // executors are identical but not concurrent
// so it must be a copy in each item // so there must be a copy in each item
void* executor; void* executor;
int64_t size; int64_t size;
void* content; void* content;
...@@ -156,23 +155,111 @@ typedef struct TqQueryMsg { ...@@ -156,23 +155,111 @@ typedef struct TqQueryMsg {
typedef struct TqLogReader { typedef struct TqLogReader {
void* logHandle; void* logHandle;
int32_t (*walRead)(void* logHandle, void** data, int64_t ver); int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
int64_t (*walGetFirstVer)(void* logHandle); int64_t (*logGetFirstVer)(void* logHandle);
int64_t (*walGetSnapshotVer)(void* logHandle); int64_t (*logGetSnapshotVer)(void* logHandle);
int64_t (*walGetLastVer)(void* logHandle); int64_t (*logGetLastVer)(void* logHandle);
} TqLogReader; } TqLogReader;
typedef struct TqConfig { typedef struct TqConfig {
// TODO // TODO
} TqConfig; } TqConfig;
typedef struct TqMemRef {
SMemAllocatorFactory *pAlloctorFactory;
SMemAllocator *pAllocator;
} TqMemRef;
typedef struct TqSerializedHead {
int16_t ver;
int16_t action;
int32_t checksum;
int64_t ssize;
char content[];
} TqSerializedHead;
typedef int (*TqSerializeFun)(const void* pObj, TqSerializedHead** ppHead);
typedef const void* (*TqDeserializeFun)(const TqSerializedHead* pHead, void** ppObj);
typedef void (*TqDeleteFun)(void*);
#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256
#define TQ_PAGE_SIZE 4096
//key + offset + size
#define TQ_IDX_SIZE 24
//4096 / 24
#define TQ_MAX_IDX_ONE_PAGE 170
//24 * 170
#define TQ_IDX_PAGE_BODY_SIZE 4080
//4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3
#define TQ_SVER 0
//TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2
static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_UPDATE_APPEND;
}
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_DUP_INTXN_REJECT;
}
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef struct TqMetaHandle {
int64_t key;
int64_t offset;
int64_t serializedSize;
void* valueInUse;
void* valueInTxn;
} TqMetaHandle;
typedef struct TqMetaList {
TqMetaHandle handle;
struct TqMetaList* next;
//struct TqMetaList* inTxnPrev;
//struct TqMetaList* inTxnNext;
struct TqMetaList* unpersistPrev;
struct TqMetaList* unpersistNext;
} TqMetaList;
typedef struct TqMetaStore {
TqMetaList* bucket[TQ_BUCKET_SIZE];
//a table head
TqMetaList* unpersistHead;
//TODO:temporaral use, to be replaced by unified tfile
int fileFd;
//TODO:temporaral use, to be replaced by unified tfile
int idxFd;
char* dirPath;
int32_t tqConfigFlag;
TqSerializeFun pSerializer;
TqDeserializeFun pDeserializer;
TqDeleteFun pDeleter;
} TqMetaStore;
typedef struct STQ { typedef struct STQ {
// the collection of group handle // the collection of group handle
// the handle of kvstore // the handle of kvstore
const char* path; char* path;
TqConfig* tqConfig; TqConfig* tqConfig;
TqLogReader* tqLogReader; TqLogReader* tqLogReader;
SMemAllocatorFactory* allocFac; TqMemRef tqMemRef;
TqMetaStore* tqMeta;
} STQ; } STQ;
// open in each vnode // open in each vnode
...@@ -187,7 +274,7 @@ int tqConsume(STQ*, TmqConsumeReq*); ...@@ -187,7 +274,7 @@ int tqConsume(STQ*, TmqConsumeReq*);
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); TqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(TqGroupHandle*); int tqMoveOffsetToNext(TqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
...@@ -195,18 +282,9 @@ int tqRegisterContext(TqGroupHandle*, void* ahandle); ...@@ -195,18 +282,9 @@ int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*); int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*); int tqSendLaunchQuery(TqGroupHandle*);
int tqSerializeGroupHandle(TqGroupHandle* gHandle, void** ppBytes); int tqSerializeGroupHandle(const TqGroupHandle* gHandle, TqSerializedHead** ppHead);
void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle* ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
int tqGetGHandleSSize(const TqGroupHandle* gHandle); const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle** gHandle);
int tqBufHandleSSize();
int tqBufItemSSize();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -44,8 +44,10 @@ typedef struct { ...@@ -44,8 +44,10 @@ typedef struct {
EWalType walLevel; // wal level EWalType walLevel; // wal level
} SWalCfg; } SWalCfg;
struct SWal; typedef struct SWal {
typedef struct SWal SWal; // WAL HANDLE int8_t unused;
} SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
// module initialization // module initialization
......
...@@ -17,97 +17,22 @@ ...@@ -17,97 +17,22 @@
#define _TQ_META_STORE_H_ #define _TQ_META_STORE_H_
#include "os.h" #include "os.h"
#include "tq.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256
#define TQ_PAGE_SIZE 4096
//key + offset + size
#define TQ_IDX_SIZE 24
//4096 / 24
#define TQ_MAX_IDX_ONE_PAGE 170
//24 * 170
#define TQ_IDX_PAGE_BODY_SIZE 4080
//4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3
#define TQ_SVER 0
//TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2
static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_UPDATE_APPEND;
}
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_DUP_INTXN_REJECT;
}
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef struct TqSerializedHead {
int16_t ver;
int16_t action;
int32_t checksum;
int64_t ssize;
char content[];
} TqSerializedHead;
typedef struct TqMetaHandle {
int64_t key;
int64_t offset;
int64_t serializedSize;
void* valueInUse;
void* valueInTxn;
} TqMetaHandle;
typedef struct TqMetaList {
TqMetaHandle handle;
struct TqMetaList* next;
//struct TqMetaList* inTxnPrev;
//struct TqMetaList* inTxnNext;
struct TqMetaList* unpersistPrev;
struct TqMetaList* unpersistNext;
} TqMetaList;
typedef struct TqMetaStore {
TqMetaList* bucket[TQ_BUCKET_SIZE];
//a table head
TqMetaList* unpersistHead;
int fileFd; //TODO:temporaral use, to be replaced by unified tfile
int idxFd; //TODO:temporaral use, to be replaced by unified tfile
char* dirPath;
int32_t tqConfigFlag;
int (*serializer)(const void* pObj, TqSerializedHead** ppHead);
const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj);
void (*deleter)(void*);
} TqMetaStore;
TqMetaStore* tqStoreOpen(const char* path, TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead), TqSerializeFun pSerializer,
const void* deserializer(const TqSerializedHead* pHead, void** ppObj), TqDeserializeFun pDeserializer,
void deleter(void* pObj), TqDeleteFun pDeleter,
int32_t tqConfigFlag int32_t tqConfigFlag
); );
int32_t tqStoreClose(TqMetaStore*); int32_t tqStoreClose(TqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*); //int32_t tqStoreCommitAll(TqMetaStore*);
int32_t tqStorePersist(TqMetaStore*); int32_t tqStorePersist(TqMetaStore*);
//clean deleted idx and data from persistent file //clean deleted idx and data from persistent file
int32_t tqStoreCompact(TqMetaStore*); int32_t tqStoreCompact(TqMetaStore*);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h"
//static //static
//read next version data //read next version data
...@@ -24,6 +25,46 @@ ...@@ -24,6 +25,46 @@
// //
int tqGetgHandleSSize(const TqGroupHandle *gHandle); int tqGetgHandleSSize(const TqGroupHandle *gHandle);
int tqBufHandleSSize();
int tqBufItemSSize();
TqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
TqGroupHandle* gHandle;
return NULL;
}
void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) {
STQ* pTq = malloc(sizeof(STQ));
if(pTq == NULL) {
//TODO: memory error
return NULL;
}
strcpy(pTq->path, path);
pTq->tqConfig = tqConfig;
pTq->tqLogReader = tqLogReader;
pTq->tqMemRef.pAlloctorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create();
if(pTq->tqMemRef.pAllocator == NULL) {
//TODO
}
pTq->tqMeta = tqStoreOpen(path,
(TqSerializeFun)tqSerializeGroupHandle,
(TqDeserializeFun)tqDeserializeGroupHandle,
free,
0);
if(pTq->tqMeta == NULL) {
//TODO: free STQ
return NULL;
}
return pTq;
}
static int tqProtoCheck(TmqMsgHead *pMsg) { static int tqProtoCheck(TmqMsgHead *pMsg) {
return pMsg->protoVer == 0; return pMsg->protoVer == 0;
...@@ -83,14 +124,29 @@ static int tqCommitTCGroup(TqGroupHandle* handle) { ...@@ -83,14 +124,29 @@ static int tqCommitTCGroup(TqGroupHandle* handle) {
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) { int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
//create in disk //create in disk
TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
if(gHandle == NULL) {
//TODO
return -1;
}
memset(gHandle, 0, sizeof(TqGroupHandle));
return 0; return 0;
} }
int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
//look up in disk TqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
if(gHandle == NULL) {
int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle);
if(code != 0) {
//TODO
return NULL;
}
}
//create //create
//open //open
return 0; return gHandle;
} }
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
...@@ -207,16 +263,20 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) { ...@@ -207,16 +263,20 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
return 0; return 0;
} }
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) { int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHead) {
//calculate size //calculate size
int sz = tqGetgHandleSSize(gHandle); int sz = tqGetgHandleSSize(gHandle) + sizeof(TqSerializedHead);
void* ptr = realloc(*ppBytes, sz); if(sz > (*ppHead)->ssize) {
if(ptr == NULL) { void* tmpPtr = realloc(*ppHead, sz);
free(ppBytes); if(tmpPtr == NULL) {
free(*ppHead);
//TODO: memory err //TODO: memory err
return -1; return -1;
} }
*ppBytes = ptr; *ppHead = tmpPtr;
(*ppHead)->ssize = sz;
}
void* ptr = (*ppHead)->content;
//do serialization //do serialization
*(int64_t*)ptr = gHandle->cId; *(int64_t*)ptr = gHandle->cId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
...@@ -261,8 +321,9 @@ void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) { ...@@ -261,8 +321,9 @@ void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
return ptr; return ptr;
} }
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) { const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle **ppGHandle) {
const void* ptr = pBytes; TqGroupHandle *gHandle = *ppGHandle;
const void* ptr = pHead->content;
gHandle->cId = *(int64_t*)ptr; gHandle->cId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->cgId = *(int64_t*)ptr; gHandle->cgId = *(int64_t*)ptr;
...@@ -317,15 +378,15 @@ const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) { ...@@ -317,15 +378,15 @@ const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
//TODO: make this a macro //TODO: make this a macro
int tqGetgHandleSSize(const TqGroupHandle *gHandle) { int tqGetgHandleSSize(const TqGroupHandle *gHandle) {
return sizeof(int64_t) * 2 return sizeof(int64_t) * 2 //cId + cgId
+ sizeof(int32_t) + sizeof(int32_t) //topicNum
+ gHandle->topicNum * tqBufHandleSSize(); + gHandle->topicNum * tqBufHandleSSize();
} }
//TODO: make this a macro //TODO: make this a macro
int tqBufHandleSSize() { int tqBufHandleSSize() {
return sizeof(int64_t) * 2 return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
+ sizeof(int32_t) * 2 + sizeof(int32_t) * 2 // head + tail
+ TQ_BUFFER_SIZE * tqBufItemSSize(); + TQ_BUFFER_SIZE * tqBufItemSSize();
} }
......
...@@ -69,9 +69,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { ...@@ -69,9 +69,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
} }
TqMetaStore* tqStoreOpen(const char* path, TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead), TqSerializeFun serializer,
const void* deserializer(const TqSerializedHead* pHead, void** ppObj), TqDeserializeFun deserializer,
void deleter(void* pObj), TqDeleteFun deleter,
int32_t tqConfigFlag int32_t tqConfigFlag
) { ) {
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); TqMetaStore* pMeta = malloc(sizeof(TqMetaStore));
...@@ -127,9 +127,9 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -127,9 +127,9 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta->fileFd = fileFd; pMeta->fileFd = fileFd;
pMeta->serializer = serializer; pMeta->pSerializer = serializer;
pMeta->deserializer = deserializer; pMeta->pDeserializer = deserializer;
pMeta->deleter = deleter; pMeta->pDeleter = deleter;
pMeta->tqConfigFlag = tqConfigFlag; pMeta->tqConfigFlag = tqConfigFlag;
//read idx file and load into memory //read idx file and load into memory
...@@ -171,25 +171,25 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -171,25 +171,25 @@ TqMetaStore* tqStoreOpen(const char* path,
} }
if(serializedObj->action == TQ_ACTION_INUSE) { if(serializedObj->action == TQ_ACTION_INUSE) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) { if(serializedObj->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInUse); pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
} else { } else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN; pNode->handle.valueInUse = TQ_DELETE_TOKEN;
} }
} else if(serializedObj->action == TQ_ACTION_INTXN) { } else if(serializedObj->action == TQ_ACTION_INTXN) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) { if(serializedObj->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn); pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn);
} else { } else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
} }
} else if(serializedObj->action == TQ_ACTION_INUSE_CONT) { } else if(serializedObj->action == TQ_ACTION_INUSE_CONT) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) { if(serializedObj->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInUse); pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
} else { } else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN; pNode->handle.valueInUse = TQ_DELETE_TOKEN;
} }
TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize); TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
if(ptr->ssize != sizeof(TqSerializedHead)) { if(ptr->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(ptr, &pNode->handle.valueInTxn); pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn);
} else { } else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
} }
...@@ -225,11 +225,11 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -225,11 +225,11 @@ TqMetaStore* tqStoreOpen(const char* path,
if(pBucketNode) { if(pBucketNode) {
if(pBucketNode->handle.valueInUse if(pBucketNode->handle.valueInUse
&& pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pBucketNode->handle.valueInUse); pMeta->pDeleter(pBucketNode->handle.valueInUse);
} }
if(pBucketNode->handle.valueInTxn if(pBucketNode->handle.valueInTxn
&& pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) { && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pBucketNode->handle.valueInTxn); pMeta->pDeleter(pBucketNode->handle.valueInTxn);
} }
free(pBucketNode); free(pBucketNode);
} }
...@@ -253,11 +253,11 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { ...@@ -253,11 +253,11 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
ASSERT(pNode->unpersistPrev == NULL); ASSERT(pNode->unpersistPrev == NULL);
if(pNode->handle.valueInTxn if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
if(pNode->handle.valueInUse if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
TqMetaList* next = pNode->next; TqMetaList* next = pNode->next;
free(pNode); free(pNode);
...@@ -280,11 +280,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { ...@@ -280,11 +280,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
while(pNode) { while(pNode) {
if(pNode->handle.valueInTxn if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
if(pNode->handle.valueInUse if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
TqMetaList* next = pNode->next; TqMetaList* next = pNode->next;
free(pNode); free(pNode);
...@@ -338,7 +338,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { ...@@ -338,7 +338,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) { if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
pSHead->ssize = sizeof(TqSerializedHead); pSHead->ssize = sizeof(TqSerializedHead);
} else { } else {
pMeta->serializer(pNode->handle.valueInUse, &pSHead); pMeta->pSerializer(pNode->handle.valueInUse, &pSHead);
} }
nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize); nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize);
ASSERT(nBytes == pSHead->ssize); ASSERT(nBytes == pSHead->ssize);
...@@ -349,7 +349,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { ...@@ -349,7 +349,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
pSHead->ssize = sizeof(TqSerializedHead); pSHead->ssize = sizeof(TqSerializedHead);
} else { } else {
pMeta->serializer(pNode->handle.valueInTxn, &pSHead); pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead);
} }
int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize); int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize);
ASSERT(nBytesTxn == pSHead->ssize); ASSERT(nBytesTxn == pSHead->ssize);
...@@ -423,7 +423,7 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value ...@@ -423,7 +423,7 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
//TODO: think about thread safety //TODO: think about thread safety
if(pNode->handle.valueInUse if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
//change pointer ownership //change pointer ownership
pNode->handle.valueInUse = value; pNode->handle.valueInUse = value;
...@@ -496,7 +496,7 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val ...@@ -496,7 +496,7 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val
return -2; return -2;
} }
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
} }
pNode->handle.valueInTxn = value; pNode->handle.valueInTxn = value;
...@@ -562,7 +562,7 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { ...@@ -562,7 +562,7 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
} }
if(pNode->handle.valueInUse if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
pNode->handle.valueInUse = pNode->handle.valueInTxn; pNode->handle.valueInUse = pNode->handle.valueInTxn;
pNode->handle.valueInTxn = NULL; pNode->handle.valueInTxn = NULL;
...@@ -582,7 +582,7 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { ...@@ -582,7 +582,7 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn) {
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
pNode->handle.valueInTxn = NULL; pNode->handle.valueInTxn = NULL;
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
...@@ -602,7 +602,7 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { ...@@ -602,7 +602,7 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
while(pNode) { while(pNode) {
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
......
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "tq.h"
using namespace std;
TEST(TqSerializerTest, basicTest) {
TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
}
...@@ -19,11 +19,19 @@ int32_t walInit() { return 0; } ...@@ -19,11 +19,19 @@ int32_t walInit() { return 0; }
void walCleanUp() {} void walCleanUp() {}
SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; } SWal *walOpen(char *path, SWalCfg *pCfg) {
SWal* pWal = malloc(sizeof(SWal));
if(pWal == NULL) {
return NULL;
}
return pWal;
}
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
void walClose(SWal *pWal) {} void walClose(SWal *pWal) {
if(pWal) free(pWal);
}
void walFsync(SWal *pWal, bool force) {} void walFsync(SWal *pWal, bool force) {}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册