diff --git a/CMakeLists.txt b/CMakeLists.txt index 95e1cad2fe93a9a0e78f4e3cb83dac361d043fd7..efb92b114f4e87229d3e9957efda02379f647c9c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,4 +54,4 @@ add_library(api INTERFACE ${API_SRC}) # src add_subdirectory(source) -# tests (TODO) \ No newline at end of file +# tests (TODO) diff --git a/include/client/consumer/consumer.h b/include/client/consumer/consumer.h index 742e2c12fe08175f8dc0d05cce9b83976e01673a..8d1c9835e643d6f2fdddf32e5aef23ef1f0fcb44 100644 --- a/include/client/consumer/consumer.h +++ b/include/client/consumer/consumer.h @@ -16,6 +16,10 @@ #ifndef _TD_CONSUMER_H_ #define _TD_CONSUMER_H_ +#include "tlist.h" +#include "tarray.h" +#include "hash.h" + #ifdef __cplusplus extern "C" { #endif @@ -32,16 +36,15 @@ extern "C" { struct tmq_resp_err_t; typedef struct tmq_resp_err_t tmq_resp_err_t; - //topic list - //resouces are supposed to be free by users by calling tmq_list_destroy - struct tmq_topic_list_t; - typedef struct tmq_topic_list_t tmq_topic_list_t; - int32_t tmq_topic_list_add(tmq_topic_list_t*, const char*); - void tmq_topic_list_destroy(tmq_topic_list_t*); + struct tmq_message_t; + typedef struct tmq_message_t tmq_message_t; + + struct tmq_col_batch_t; + typedef struct tmq_col_batch_t tmq_col_batch_t; //get content of message - tmq_col_batch_t *tmq_get_msg_col_by_idx(tmq_message_t*, int32_t); - tmq_col_batch_t *tmq_get_msg_col_by_name(tmq_message_t*, const char*); + tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id); + tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*); //consumer config int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap); @@ -51,11 +54,12 @@ extern "C" { tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap); //subscribe - tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const tmq_topic_list_t*); + tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*); + tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*); //consume //resouces are supposed to be free by users by calling tmq_message_destroy - tmq_message_t tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); + tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); //destroy message and free memory void tmq_message_destroy(tmq_message_t*); diff --git a/include/common/taosMsg.h b/include/common/taosMsg.h deleted file mode 100644 index 0d083a4ca50f8d68d87f4fb6eb66286eee726e97..0000000000000000000000000000000000000000 --- a/include/common/taosMsg.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_TAOS_MSG_H_ -#define _TD_TAOS_MSG_H_ - -typedef struct { - /* data */ -} SSubmitReq; - -typedef struct { - /* data */ -} SSubmitRsp; - -typedef struct { - /* data */ -} SSubmitReqReader; - -typedef struct { - /* data */ -} SCreateTableReq; - -typedef struct { - /* data */ -} SCreateTableRsp; - -typedef struct { - /* data */ -} SDropTableReq; - -typedef struct { - /* data */ -} SDropTableRsp; - -typedef struct { - /* data */ -} SAlterTableReq; - -typedef struct { - /* data */ -} SAlterTableRsp; - -#endif /*_TD_TAOS_MSG_H_*/ \ No newline at end of file diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index c296317758511f1c6f4a7f5fd563b40f0ca92569..6cabc8568cc88bbbe4656e964b40ff48fa22cedf 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -952,6 +952,42 @@ typedef struct { char reserved2[64]; } SStartupStep; +typedef struct { + /* data */ +} SSubmitReq; + +typedef struct { + /* data */ +} SSubmitRsp; + +typedef struct { + /* data */ +} SSubmitReqReader; + +typedef struct { + /* data */ +} SCreateTableReq; + +typedef struct { + /* data */ +} SCreateTableRsp; + +typedef struct { + /* data */ +} SDropTableReq; + +typedef struct { + /* data */ +} SDropTableRsp; + +typedef struct { + /* data */ +} SAlterTableReq; + +typedef struct { + /* data */ +} SAlterTableRsp; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8f6d09f8d6251edd9a5398ab1752f366a88e2849..e59d60f7dc2f7e816cd302cfd24be164098d781f 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -15,13 +15,16 @@ #ifndef _TD_WAL_H_ #define _TD_WAL_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif typedef enum { TAOS_WAL_NOLOG = 0, - TAOS_WAL_WRITE = 1 + TAOS_WAL_WRITE = 1, + TAOS_WAL_FSYNC = 2 } EWalType; typedef struct { @@ -55,8 +58,8 @@ void walStop(twalh); void walClose(twalh); //write -int32_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); -void walWaitFsync(twalh, bool forceHint); +int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); +void walFsync(twalh, bool forceHint); //int32_t walCommit(twalh, uint64_t ver); //int32_t walRollback(twalh, uint64_t ver); @@ -67,7 +70,7 @@ int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); //life cycle int32_t walDataPersisted(twalh, int64_t ver); int32_t walFirstVer(twalh); -int32_t walLastVer(twal); +int32_t walLastVer(twalh); //int32_t walDataCorrupted(twalh); #ifdef __cplusplus diff --git a/include/server/qnode/qnode.h b/include/server/qnode/qnode.h index 00daf2b0514d077766001c436d248527bb06a7d6..65779b209938bc9766f2d0d11881c2354d0c5092 100644 --- a/include/server/qnode/qnode.h +++ b/include/server/qnode/qnode.h @@ -20,6 +20,94 @@ extern "C" { #endif + +typedef struct { + uint64_t numOfStartTask; + uint64_t numOfStopTask; + uint64_t numOfRecvedFetch; + uint64_t numOfSentHb; + uint64_t numOfSentFetch; + uint64_t numOfTaskInQueue; + uint64_t numOfFetchInQueue; + uint64_t numOfErrors; +} SQnodeStat; + +/* start Task msg */ +typedef struct { + uint32_t schedulerIp; + uint16_t schedulerPort; + int64_t taskId; + int64_t queryId; + uint32_t srcIp; + uint16_t srcPort; +} SQnodeStartTaskMsg; + +/* stop Task msg */ +typedef struct { + int64_t taskId; +} SQnodeStopTaskMsg; + +/* start/stop Task msg response */ +typedef struct { + int64_t taskId; + int32_t code; +} SQnodeTaskRespMsg; + +/* Task status msg */ +typedef struct { + int64_t taskId; + int32_t status; + int64_t queryId; +} SQnodeTaskStatusMsg; + +/* Qnode/Scheduler heartbeat msg */ +typedef struct { + int32_t status; + int32_t load; + +} SQnodeHeartbeatMsg; + +/* Qnode sent/received msg */ +typedef struct { + int8_t msgType; + int32_t msgLen; + char msg[]; +} SQnodeMsg; + + +/** + * Start one Qnode in Dnode. + * @return Error Code. + */ +int32_t qnodeStart(); + +/** + * Stop Qnode in Dnode. + * + * @param qnodeId Qnode ID to stop, -1 for all Qnodes. + */ +void qnodeStop(int64_t qnodeId); + + +/** + * Get the statistical information of Qnode + * + * @param qnodeId Qnode ID to get statistics, -1 for all + * @param stat Statistical information. + * @return Error Code. + */ +int32_t qnodeGetStatistics(int64_t qnodeId, SQnodeStat *stat); + +/** + * Interface for processing Qnode messages. + * + * @param pMsg Message to be processed. + * @return Error code + */ +void qnodeProcessReq(SRpcMsg *pMsg); + + + #ifdef __cplusplus } #endif diff --git a/include/server/vnode/meta/meta.h b/include/server/vnode/meta/meta.h index ca7cba9b0c3dd00fd8895eeb55a0a3c5f3ecc073..a68409d2fce3de55eda39d9095d50df150832009 100644 --- a/include/server/vnode/meta/meta.h +++ b/include/server/vnode/meta/meta.h @@ -16,7 +16,7 @@ #ifndef _TD_META_H_ #define _TD_META_H_ -#include "taosMsg.h" +#include "taosmsg.h" #ifdef __cplusplus extern "C" { @@ -33,4 +33,4 @@ int metaCommit(SMeta *pMeta); } #endif -#endif /*_TD_META_H_*/ \ No newline at end of file +#endif /*_TD_META_H_*/ diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 4c626a1e25af0ddcef4018766a06ff594b013740..91688e890df0c699de8f9e0ab4004d5bbd3da182 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -16,17 +16,35 @@ #ifndef _TD_TQ_H_ #define _TD_TQ_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif typedef struct STQ STQ; -int tqPushMsg(void *); -int tqCommit(STQ *pTQ); +STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); +void tqCleanUp(STQ* pTq); + +//create persistent storage for meta info such as consuming offset +//return value > 0: cgId +//return value <= 0: error code +int tqCreateGroup(STQ*); +//create ring buffer in memory and load consuming offset +int tqOpenGroup(STQ*, int cgId); +//destroy ring buffer and persist consuming offset +int tqCloseGroup(STQ*, int cgId); +//delete persistent storage for meta info +int tqDropGroup(STQ*, int cgId); + +int tqPushMsg(STQ*, void *, int64_t version); +int tqCommit(STQ*); + +int tqHandleMsg(STQ*, void *msg); #ifdef __cplusplus } #endif -#endif /*_TD_TQ_H_*/ \ No newline at end of file +#endif /*_TD_TQ_H_*/ diff --git a/include/server/vnode/tsdb/tsdb.h b/include/server/vnode/tsdb/tsdb.h index 968bac2fa2e8a9322198ad6a6d279825bb1f43fd..4c6eb7a1e33911e2f7350298c335a6801ae9445d 100644 --- a/include/server/vnode/tsdb/tsdb.h +++ b/include/server/vnode/tsdb/tsdb.h @@ -17,7 +17,7 @@ #define _TD_TSDB_H_ #include "os.h" -#include "taosMsg.h" +#include "taosmsg.h" #ifdef __cplusplus extern "C" { @@ -55,4 +55,4 @@ int tsdbCommit(STsdb *pTsdb); } #endif -#endif /*_TD_TSDB_H_*/ \ No newline at end of file +#endif /*_TD_TSDB_H_*/ diff --git a/source/client/consumer/consumer.c b/source/client/consumer/consumer.c index 6dea4a4e57392be988126c579648f39a8270b9bf..4ba1f9514444f8972bcb4c72cdb8a3c3c681eeec 100644 --- a/source/client/consumer/consumer.c +++ b/source/client/consumer/consumer.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "consumer.h" diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index 1e195862763a63f5bc9c92aa5044c11334090aab..fbcdff59eeb243cfe0c4569eae63ca8b0d8043b4 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -4,4 +4,9 @@ target_include_directories( wal PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" +) + +target_link_libraries( + os +) diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index f4102b8d4b3e598c2afadbbc2476d487530f8fb2..9577007400b58e712f5996aa1378fc9a2c4e7a08 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -4,4 +4,9 @@ target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" +) + +target_link_libraries( + os +) diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 9a4b9731065270b66f2b75d3a372874b941eaa55..416a915456e78b1746e4a3cc5a4cde5336b42f97 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -16,12 +16,17 @@ #ifndef _TD_TQ_INT_H_ #define _TD_TQ_INT_H_ +#include "tq.h" + #ifdef __cplusplus extern "C" { #endif +//implement the array index +//implement the ring buffer + #ifdef __cplusplus } #endif -#endif /*_TD_TQ_INT_H_*/ \ No newline at end of file +#endif /*_TD_TQ_INT_H_*/ diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index a5391821155bd260b73300117d5545d987168220..3255f3fb3a1c4723186daab7b3b6fdb5faedb683 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -13,7 +13,21 @@ * along with this program. If not, see . */ -#include "tq.h" +#include "tqInt.h" -int tqPushMsg(void * p) {return 0;} -int tqCommit(STQ *pTQ) {return 0;} +//static +//read next version data +// +//send to fetch queue +// +//handle management message + +int tqPushMsg(STQ* pTq , void* p, int64_t version) { + //add reference + // + return 0; +} + +int tqCommit(STQ* pTq) { + return 0; +}