From fed05bb64b3782e411e920cdefb5937e2779b1d7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 16 Nov 2021 16:03:39 +0800 Subject: [PATCH] add memallocator dependency --- include/dnode/vnode/tq/tq.h | 124 +++++++++++++----------- include/libs/wal/wal.h | 6 +- source/dnode/vnode/tq/inc/tqMetaStore.h | 1 - source/libs/wal/src/wal.c | 14 ++- 4 files changed, 85 insertions(+), 60 deletions(-) diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 3aeaf9acb6..7a21b08aaf 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -18,6 +18,7 @@ #include "os.h" #include "tutil.h" +#include "mallocator.h" #ifdef __cplusplus extern "C" { @@ -37,19 +38,19 @@ typedef struct TmqOneAck { typedef struct TmqAcks { int32_t ackNum; - //should be sorted + // should be sorted TmqOneAck acks[]; } TmqAcks; -//TODO: put msgs into common +// TODO: put msgs into common typedef struct TmqConnectReq { TmqMsgHead head; - TmqAcks acks; + TmqAcks acks; } TmqConnectReq; typedef struct TmqConnectRsp { TmqMsgHead head; - int8_t status; + int8_t status; } TmqConnectRsp; typedef struct TmqDisconnectReq { @@ -58,12 +59,12 @@ typedef struct TmqDisconnectReq { typedef struct TmqDisconnectRsp { TmqMsgHead head; - int8_t status; + int8_t status; } TmqDisconnectRsp; typedef struct TmqConsumeReq { TmqMsgHead head; - TmqAcks acks; + TmqAcks acks; } TmqConsumeReq; typedef struct TmqMsgContent { @@ -80,90 +81,105 @@ typedef struct TmqConsumeRsp { typedef struct TmqSubscribeReq { TmqMsgHead head; - int32_t topicNum; - int64_t topic[]; + int32_t topicNum; + int64_t topic[]; } TmqSubscribeReq; typedef struct tmqSubscribeRsp { TmqMsgHead head; - int64_t vgId; - char ep[TSDB_EP_LEN]; //TSDB_EP_LEN + int64_t vgId; + char ep[TSDB_EP_LEN]; // TSDB_EP_LEN } TmqSubscribeRsp; typedef struct TmqHeartbeatReq { - } TmqHeartbeatReq; typedef struct TmqHeartbeatRsp { - } TmqHeartbeatRsp; typedef struct TqTopicVhandle { int64_t topicId; - //executor for filter - void* filterExec; - //callback for mnode - //trigger when vnode list associated topic change + // executor for filter + void* filterExec; + // callback for mnode + // trigger when vnode list associated topic change void* (*mCallback)(void*, void*); } TqTopicVhandle; -typedef struct STQ { - //the collection of group handle - //the handle of kvstore -} STQ; #define TQ_BUFFER_SIZE 8 -//TODO: define a serializer and deserializer +// TODO: define a serializer and deserializer typedef struct TqBufferItem { int64_t offset; - //executors are identical but not concurrent - //so it must be a copy in each item - void* executor; + // executors are identical but not concurrent + // so it must be a copy in each item + void* executor; int64_t size; - void* content; + void* content; } TqBufferItem; typedef struct TqBufferHandle { - //char* topic; //c style, end with '\0' - //int64_t cgId; - //void* ahandle; - int64_t nextConsumeOffset; - int64_t topicId; - int32_t head; - int32_t tail; + // char* topic; //c style, end with '\0' + // int64_t cgId; + // void* ahandle; + int64_t nextConsumeOffset; + int64_t topicId; + int32_t head; + int32_t tail; TqBufferItem buffer[TQ_BUFFER_SIZE]; } TqBufferHandle; typedef struct TqListHandle { - TqBufferHandle bufHandle; + TqBufferHandle bufHandle; struct TqListHandle* next; } TqListHandle; typedef struct TqGroupHandle { - int64_t cId; - int64_t cgId; - void* ahandle; - int32_t topicNum; - TqListHandle *head; + int64_t cId; + int64_t cgId; + void* ahandle; + int32_t topicNum; + TqListHandle* head; } TqGroupHandle; typedef struct TqQueryExec { - void* src; + void* src; TqBufferItem* dest; - void* executor; + void* executor; } TqQueryExec; typedef struct TqQueryMsg { - TqQueryExec *exec; - struct TqQueryMsg *next; + TqQueryExec* exec; + struct TqQueryMsg* next; } TqQueryMsg; -//init in each vnode -STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); -void tqCleanUp(STQ*); +typedef struct TqLogReader { + void* logHandle; + int32_t (*walRead)(void* logHandle, void** data, int64_t ver); + int64_t (*walGetFirstVer)(void* logHandle); + int64_t (*walGetSnapshotVer)(void* logHandle); + int64_t (*walGetLastVer)(void* logHandle); +} TqLogReader; + +typedef struct TqConfig { + // TODO +} TqConfig; + +typedef struct STQ { + // the collection of group handle + // the handle of kvstore + const char* path; + TqConfig* tqConfig; + TqLogReader* tqLogReader; + SMemAllocatorFactory* allocFac; +} STQ; + +// open in each vnode +STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac); +void tqDestroy(STQ*); -//void* will be replace by a msg type +// void* will be replace by a msg type int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); @@ -179,16 +195,16 @@ int tqRegisterContext(TqGroupHandle*, void* ahandle); int tqLaunchQuery(TqGroupHandle*); int tqSendLaunchQuery(TqGroupHandle*); -int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes); -void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr); -void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr); -void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr); +int tqSerializeGroupHandle(TqGroupHandle* gHandle, void** ppBytes); +void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr); +void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr); +void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr); -const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle); -const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle); -const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem); +const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle* ghandle); +const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle); +const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem); -int tqGetGHandleSSize(const TqGroupHandle *gHandle); +int tqGetGHandleSSize(const TqGroupHandle* gHandle); int tqBufHandleSSize(); int tqBufItemSSize(); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 143bdf0710..0829782310 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -75,9 +75,9 @@ int32_t walRead(SWal *, SWalHead **, int64_t ver); int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); // lifecycle check -int32_t walFirstVer(SWal *); -int32_t walPersistedVer(SWal *); -int32_t walLastVer(SWal *); +int64_t walGetFirstVer(SWal *); +int64_t walGetSnapshotVer(SWal *); +int64_t walGetLastVer(SWal *); // int32_t walDataCorrupted(SWal*); #ifdef __cplusplus diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 3e49f11254..63e48625d9 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -17,7 +17,6 @@ #define _TQ_META_STORE_H_ #include "os.h" -#include "tq.h" #ifdef __cplusplus diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c index 9331cce20b..f25c127f3f 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/wal.c @@ -27,10 +27,20 @@ void walClose(SWal *pWal) {} void walFsync(SWal *pWal, bool force) {} -int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {} +int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { + return 0; +} int32_t walCommit(SWal *pWal, int64_t ver) { return 0; } int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } -int32_t walPrune(SWal *pWal, int64_t ver) { return 0; } \ No newline at end of file +int32_t walPrune(SWal *pWal, int64_t ver) { return 0; } + + +int32_t walRead(SWal *, SWalHead **, int64_t ver); +int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); + +int64_t walGetFirstVer(SWal *); +int64_t walGetSnapshotVer(SWal *); +int64_t walGetLastVer(SWal *); -- GitLab