diff --git a/cmake/lucene_CMakeLists.txt.in b/cmake/lucene_CMakeLists.txt.in index 91e144dcedbfc999cd70385c70e775e92ecf9b1a..fc7bec2dd23d9c3be782e5e51bdcb73a45d3b3c8 100644 --- a/cmake/lucene_CMakeLists.txt.in +++ b/cmake/lucene_CMakeLists.txt.in @@ -2,7 +2,7 @@ # lucene ExternalProject_Add(lucene GIT_REPOSITORY https://github.com/taosdata-contrib/LucenePlusPlus.git - GIT_TAG rel_3.0.8 + GIT_TAG rel_3.0.8_td SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/lucene" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/deps/test/CMakeLists.txt b/deps/test/CMakeLists.txt index 4547431ca7806da0b596926165dc4f5d0d973531..e571146b86ac49459d45be8856463bdbb2f866a3 100644 --- a/deps/test/CMakeLists.txt +++ b/deps/test/CMakeLists.txt @@ -2,3 +2,7 @@ if(${BUILD_WITH_ROCKSDB}) add_subdirectory(rocksdb) endif(${BUILD_WITH_ROCKSDB}) + +if(${BUILD_WITH_LUCENE}) + add_subdirectory(lucene) +endif(${BUILD_WITH_LUCENE}) diff --git a/deps/test/lucene/CMakeLists.txt b/deps/test/lucene/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..853921094851d18a74e0489bac2ebd6a6bab1abc --- /dev/null +++ b/deps/test/lucene/CMakeLists.txt @@ -0,0 +1,6 @@ +add_executable(luceneTest "") +target_sources(luceneTest + PRIVATE + "${CMAKE_CURRENT_SOURCE_DIR}/main.cpp" +) +target_link_libraries(luceneTest lucene++) \ No newline at end of file diff --git a/deps/test/lucene/main.cpp b/deps/test/lucene/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9ea2fbbec43101d4f5a4fffc495bdfb39d975c4e --- /dev/null +++ b/deps/test/lucene/main.cpp @@ -0,0 +1,6 @@ +#include + +int main(int argc, char const *argv[]) { + std::cout << "Hello, this is lucene test" << std::endl; + return 0; +} diff --git a/docs/scratch.md b/docs/scratch.md new file mode 100644 index 0000000000000000000000000000000000000000..fc0239d72716b8dfabef7a9c280b7c4d6eb1fb8a --- /dev/null +++ b/docs/scratch.md @@ -0,0 +1,236 @@ +```plantuml + @startuml create_table + skinparam sequenceMessageAlign center + skinparam responseMessageBelowArrow true + + participant APP as app + box "dnode1" + participant RPC as rpc + participant VNODE as vnode + participant SYNC as sync + end box + + box "dnode2" + participant SYNC as sync2 + participant VNODE as vnode2 + end box + + box "dnode3" + participant SYNC as sync3 + participant VNODE as vnode3 + end box + + ' APP send request to dnode and RPC in dnode recv the request + app ->rpc: create table req + + ' RPC call vnodeProcessReq() function to process the request + rpc -> vnode: vnodeProcessReq + note right + callback function + run in RPC module + threads. The function + only puts the request + to a vnode queue. + end note + + ' VNODE call vnodeProcessReqs() function to integrate requests and process as a whole + vnode -> vnode: vnodeProcessReqs() + note right + integrate reqs and + process as a whole + end note + + + ' sync the request to other nodes + vnode -> sync: syncProcessReqs() + + ' make request persistent + ' sync -->vnode: walWrite()\n(callback function) + + ' replicate requests to other DNODES + sync -> sync2: replication req + sync -> sync3: replication req + sync2 -> vnode2: walWrite()\n(callback function) + sync2 --> sync: replication rsp\n(confirm) + sync3 -> vnode3: walWrite()\n(callback function) + + sync3 --> sync: replication rsp\n(confirm) + + ' send apply request + sync -> sync2: apply req + sync -> sync3: apply req + + ' vnode apply + sync2 -> vnode2: vnodeApplyReqs() + sync3 -> vnode3: vnodeApplyReqs() + + ' call apply request + sync --> vnode: vnodeApplyReqs()\n(callback function) + + ' send response + vnode --> rpc: rpcSendRsp() + + ' dnode send response to APP + rpc --> app: create table rsp + @enduml +``` + +## Leader处理强一致写入请求 +```plantuml + @startuml leader_process_stict_consistency + box "dnode1" + participant CRPC as crpc + participant VNODE as vnode + participant SYNC as sync + end box + + -> crpc: create table/submit req + + ' In CRPC threads + group #pink "In CRPC threads" + crpc -> vnode:vnodeProcessReq() + note right + A callback function + run by CRPC thread + to put the request + to a vnode queue + end note + end + + ' In VNODE worker threads + group #lightblue "In VNODE worker threads" + vnode -> vnode: vnodeProcessReqs() + note right + VNODE process requests + accumulated in a + vnode write queue and + process the batch reqs + as a whole + end note + + vnode -> sync: syncProcessReqs() + + sync -> : replication req1 + sync -> : replication req2 + end + + group #red "SYNC threads" + sync <- : replication rsp1 + sync <- : replication rsp2 + sync -> vnode: notify apply + sync -> : apply rsp1 + sync -> : apply rsp2 + end + + group #lightblue "In VNODE worker threads" + vnode -> vnode: vnodeApplyReqs() + vnode -> crpc: + end + + <- crpc: create table/submit rsp + + @enduml +``` + +## Follower处理强一致写入请求 +```plantuml + @startuml follower_process_strict_consistency + participant SYNC as sync + participant VNODE as vnode + + group #pink "SYNC threads" + -> sync: replication req + + sync -> sync: syncProcessReqs() + note right + In the replication + only data is + persisted and response + is sent back + end note + + <- sync: replication rsp + + -> sync: apply req + + sync -> vnode: notify apply + end + + group #lightblue "VNODE worker threads" + vnode -> vnode: vnodeApplyReqs() + end + + @enduml +``` + +## Leader处理最终一致写入请求 +```plantuml + @startuml leader_process_eventual_consistency + box "dnode1" + participant CRPC as crpc + participant VNODE as vnode + participant SYNC as sync + end box + + -> crpc: create table/submit req + + ' In CRPC threads + group #pink "In CRPC threads" + crpc -> vnode:vnodeProcessReq() + note right + A callback function + run by CRPC thread + to put the request + to a vnode queue + end note + end + + ' In VNODE worker threads + group #lightblue "In VNODE worker threads" + vnode -> vnode: vnodeProcessReqs() + note right + VNODE process requests + accumulated in a + vnode write queue and + process the batch reqs + as a whole + end note + + vnode -> sync: syncProcessReqs() + + sync -> : replication req1 + sync -> : replication req2 + + sync -> vnode: notify apply + end + + + group #lightblue "In VNODE worker threads" + vnode -> vnode: vnodeApplyReqs() + vnode -> crpc: + end + + <- crpc: create table/submit rsp + + @enduml +``` + +## Follower处理最终一致写入请求 +```plantuml + @startuml follower_process_eventual_consistency + participant SYNC as sync + participant VNODE as vnode + + group #pink "SYNC threads" + -> sync: replication rsp + + sync -> sync: syncProcessReqs() + + sync -> vnode: notify VNODE \nthread to process\n the reqs + end + + group #lightblue "VNODE worker threads" + vnode -> vnode: vnodeApplyReqs() + end + @enduml +``` \ No newline at end of file diff --git a/docs/vnode_write.md b/docs/vnode_write.md new file mode 100644 index 0000000000000000000000000000000000000000..7b8dbd6535587d9fdf204d92fa63c6298038df8a --- /dev/null +++ b/docs/vnode_write.md @@ -0,0 +1,311 @@ +

VNODE Write Processes

+ +## META Operations +META data write operations including: + +1. create table +2. drop table +3. alter table + +We take create table as an example to figure out the whole process. +```plantuml +@startuml create_table +skinparam sequenceMessageAlign center +skinparam responseMessageBelowArrow true + +participant APP as app +box "dnode1" + participant RPC as rpc + participant VNODE as vnode + participant SYNC as sync +end box + +box "dnode2" + participant SYNC as sync2 + participant VNODE as vnode2 +end box + +box "dnode3" + participant SYNC as sync3 + participant VNODE as vnode3 +end box + +' APP send request to dnode and RPC in dnode recv the request +app ->rpc: create table req + +' RPC call vnodeProcessReq() function to process the request +rpc -> vnode: vnodeProcessReq +note right +callback function +run in RPC module +threads. The function +only puts the request +to a vnode queue. +end note + +' VNODE call vnodeProcessReqs() function to integrate requests and process as a whole +vnode -> vnode: vnodeProcessReqs() +note right +integrate reqs and +process as a whole +end note + + +' sync the request to other nodes +vnode -> sync: syncProcessReqs() + +' make request persistent +' sync -->vnode: walWrite()\n(callback function) + +' replicate requests to other DNODES +sync -> sync2: replication req +sync -> sync3: replication req +sync2 -> vnode2: walWrite()\n(callback function) +sync2 --> sync: replication rsp\n(confirm) +sync3 -> vnode3: walWrite()\n(callback function) + +sync3 --> sync: replication rsp\n(confirm) + +' send apply request +sync -> sync2: apply req +sync -> sync3: apply req + +' vnode apply +sync2 -> vnode2: vnodeApplyReqs() +sync3 -> vnode3: vnodeApplyReqs() + +' call apply request +sync --> vnode: vnodeApplyReqs()\n(callback function) + +' send response +vnode --> rpc: rpcSendRsp() + +' dnode send response to APP +rpc --> app: create table rsp +@enduml +``` + +## Time-series data Operations +There are only one operations for time-series data: data insert. We will figure out the whole process. + +```plantuml +@startuml create_table +skinparam sequenceMessageAlign center +skinparam responseMessageBelowArrow true + +participant APP as app +box "dnode1" + participant RPC as rpc + participant VNODE as vnode + participant SYNC as sync +end box + +box "dnode2" + participant SYNC as sync2 + participant VNODE as vnode2 +end box + +box "dnode3" + participant SYNC as sync3 + participant VNODE as vnode3 +end box + +' APP send request to dnode and RPC in dnode recv the request +app ->rpc: insert data req + +' RPC call vnodeProcessReq() function to process the request +rpc -> vnode: vnodeProcessReq +note right +callback function +run in RPC module +threads. The function +only puts the request +to a vnode queue. +end note + +' VNODE call vnodeProcessReqs() function to integrate requests and process as a whole +vnode -> vnode: vnodeProcessReqs() +note right +integrate reqs and +process as a whole +end note + + +' sync the request to other nodes +vnode -> sync: syncProcessReqs() + +' ' make request persistent +' ' sync -->vnode: walWrite()\n(callback function) + +' ' replicate requests to other DNODES +sync -> sync2: replication req +sync -> sync3: replication req + +' vnode apply +sync2 -> vnode2: vnodeApplyReqs() +sync3 -> vnode3: vnodeApplyReqs() + +' call apply request +sync --> vnode: vnodeApplyReqs()\n(callback function) + +' send response +vnode --> rpc: rpcSendRsp() + +' dnode send response to APP +rpc --> app: insert data rsp +@enduml +``` + +## vnodeProcessReqs() +```plantuml +@startuml vnodeProcessReqs() +participant VNODE as v +participant SYNC as s + +group vnodeProcessReqs() + ' Group requests and get a request batch to process as a whole + v -> v: vnodeGetReqsFromQueue() + note right + integrate all write + requests as a batch + to process as a whole + end note + + ' VNODE call syncProcessReqs() function to process the batch request + v -> s: syncProcessReqs() + + group syncProcessReqs() + ' Check if current node is leader + alt not leader + return NOT_LEADER + end + + s -> s: syncAppendReqsToLogStore() + group syncAppendReqsToLogStore() + s -> v: walWrite() + note right + There must be a + callback function + provided by VNODE + to persist the + requests in WAL + end note + + alt (no unapplied reqs) AND (only one node OR no meta requests) + s -> v: vnodeApplyReqs() + note right + just use the woker + thread to apply + the requests. This + is a callback function + provided by VNODE + end note + else other cases need to wait response + s -> s: + note right + save the requests in log store + and wait for comfirmation or + other cases + end note + + s ->]: send replication requests + s ->]: send replication requests + end + end + end +end +@enduml +``` + + + + +## vnodeApplyReqs() +The function *vnodeApplyReqs()* is the actual function running by a vnode to process the requests. +```plantuml +@startuml vnodeApplyReqs() +skinparam sequenceMessageAlign left +skinparam responseMessageBelowArrow true + +participant VNODE as vnode +participant TQ as tq +participant TSDB as tsdb +participant META as meta + +group vnodeApplyReqs() + autonumber + loop nReqs + ' Copy request message to vnode buffer pool + vnode -> vnode: vnodeCopyReq() + note right + copy request to + vnode buffer pool + end note + + vnode -> tq: tqPush() + note right + push the request + to TQ so consumers + can consume + end note + alt META_REQ + autonumber 3 + vnode -> meta: metaApplyReq() + else TS_REQ + autonumber 3 + vnode -> tsdb: tsdbApplyReq() + end + + end + + ' Check if need to commit + alt vnode buffer pool is full + group vnodeCommit() + autonumber 4.1 + vnode -> tq: tqCommit() + note right + tqCommit may renew wal + end note + vnode -> meta: metaCommit(); + note right + commit meta data + end note + vnode -> tsdb: tsdbCommit(); + note right + commit time-series data + end note + end + end +end +@enduml +``` + diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 454a2ecfa83e95ecd1c0ef3179e31013f1a5aebc..a0602ec1b05ed1b4b991c8bf4fec7615f4aa97c5 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -62,24 +62,24 @@ typedef struct SSyncFSM { void* pData; // apply committed log, bufs will be free by raft module - int (*applyLog)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData); + int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); - // cluster commit callback - int (*onClusterChanged)(struct SSyncFSM *fsm, const SSyncCluster* cluster, void *pData); + // cluster commit callback + int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); // fsm return snapshot in ppBuf, bufs will be free by raft module // TODO: getSnapshot SHOULD be async? - int (*getSnapshot)(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int* objId, bool *isLast); + int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast); // fsm apply snapshot with pBuf data - int (*applySnapshot)(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int objId, bool isLast); + int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast); // call when restore snapshot and log done - int (*onRestoreDone)(struct SSyncFSM *fsm); + int (*onRestoreDone)(struct SSyncFSM* fsm); - void (*onRollback)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf); + void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf); - void (*onRoleChanged)(struct SSyncFSM *fsm, const SNodesRole* pRole); + void (*onRoleChanged)(struct SSyncFSM* fsm, const SNodesRole* pRole); } SSyncFSM;