diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8f6d09f8d6251edd9a5398ab1752f366a88e2849..e59d60f7dc2f7e816cd302cfd24be164098d781f 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -15,13 +15,16 @@ #ifndef _TD_WAL_H_ #define _TD_WAL_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif typedef enum { TAOS_WAL_NOLOG = 0, - TAOS_WAL_WRITE = 1 + TAOS_WAL_WRITE = 1, + TAOS_WAL_FSYNC = 2 } EWalType; typedef struct { @@ -55,8 +58,8 @@ void walStop(twalh); void walClose(twalh); //write -int32_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); -void walWaitFsync(twalh, bool forceHint); +int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); +void walFsync(twalh, bool forceHint); //int32_t walCommit(twalh, uint64_t ver); //int32_t walRollback(twalh, uint64_t ver); @@ -67,7 +70,7 @@ int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); //life cycle int32_t walDataPersisted(twalh, int64_t ver); int32_t walFirstVer(twalh); -int32_t walLastVer(twal); +int32_t walLastVer(twalh); //int32_t walDataCorrupted(twalh); #ifdef __cplusplus diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 3c163f50451edb7de026789a8c92300866398cc0..91688e890df0c699de8f9e0ab4004d5bbd3da182 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -24,22 +24,24 @@ extern "C" { typedef struct STQ STQ; -STQ* tqInit(); -void tqCleanUp(STQ* pTQ); +STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); +void tqCleanUp(STQ* pTq); //create persistent storage for meta info such as consuming offset //return value > 0: cgId -//return value < 0: error code -int tqCreateGroup(STQ *pTQ); +//return value <= 0: error code +int tqCreateGroup(STQ*); //create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ* pTQ, int cgId); +int tqOpenGroup(STQ*, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ *pTQ, int cgId); +int tqCloseGroup(STQ*, int cgId); //delete persistent storage for meta info -int tqDropGroup(STQ *pTQ); +int tqDropGroup(STQ*, int cgId); -int tqPushMsg(STQ *pTQ, void *, int64_t version); -int tqCommit(STQ *pTQ); +int tqPushMsg(STQ*, void *, int64_t version); +int tqCommit(STQ*); + +int tqHandleMsg(STQ*, void *msg); #ifdef __cplusplus } diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index 1e195862763a63f5bc9c92aa5044c11334090aab..fbcdff59eeb243cfe0c4569eae63ca8b0d8043b4 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -4,4 +4,9 @@ target_include_directories( wal PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" +) + +target_link_libraries( + os +) diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index 7e80da75e9e5c0824f587e04fabe5899aae9bbe5..9577007400b58e712f5996aa1378fc9a2c4e7a08 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,8 +3,8 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" - PRIVATE "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" ) target_link_libraries( diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 435a1150b41c036009aae6b0088e14975fa54e5f..416a915456e78b1746e4a3cc5a4cde5336b42f97 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -16,6 +16,8 @@ #ifndef _TD_TQ_INT_H_ #define _TD_TQ_INT_H_ +#include "tq.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index f88c203fc984e3d36fc7300dcecb3e0370a91b61..3255f3fb3a1c4723186daab7b3b6fdb5faedb683 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -13,14 +13,21 @@ * along with this program. If not, see . */ -#include "tq.h" +#include "tqInt.h" -int tqPushMsg(STQ *pTQ, void * p, int64_t version) { +//static +//read next version data +// +//send to fetch queue +// +//handle management message + +int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference // return 0; } -int tqCommit(STQ *pTQ) { +int tqCommit(STQ* pTq) { return 0; }