提交 7d39592d 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

......@@ -45,6 +45,16 @@ if(${BUILD_WITH_ROCKSDB})
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
## bdb
if(${BUILD_WITH_BDB})
cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_DBD})
## sqlite
if(${BUILD_WITH_SQLITE})
cat("${CMAKE_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
## lucene
if(${BUILD_WITH_LUCENE})
cat("${CMAKE_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${DEPS_TMP_FILE})
......
# bdb
ExternalProject_Add(bdb
GIT_REPOSITORY https://github.com/berkeleydb/libdb.git
GIT_TAG v5.3.28
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/bdb"
BINARY_DIR "${CMAKE_SOURCE_DIR}/deps/bdb"
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND "./dist/configure"
BUILD_COMMAND "$(MAKE)"
INSTALL_COMMAND ""
TEST_COMMAND ""
)
\ No newline at end of file
......@@ -19,6 +19,18 @@ option(
ON
)
option(
BUILD_WITH_SQLITE
"If build with sqlite"
ON
)
option(
BUILD_WITH_BDB
"If build with BerkleyDB"
ON
)
option(
BUILD_WITH_LUCENE
"If build with lucene"
......@@ -34,7 +46,7 @@ option(
option(
BUILD_DEPENDENCY_TESTS
"If build dependency tests"
OFF
ON
)
option(
......
# sqlite
ExternalProject_Add(sqlite
GIT_REPOSITORY https://github.com/sqlite/sqlite.git
GIT_TAG version-3.36.0
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/sqlite"
BINARY_DIR "${CMAKE_SOURCE_DIR}/deps/sqlite"
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND "./configure"
BUILD_COMMAND "$(MAKE)"
INSTALL_COMMAND ""
TEST_COMMAND ""
)
\ No newline at end of file
......@@ -80,6 +80,33 @@ if(${BUILD_WITH_NURAFT})
add_subdirectory(nuraft)
endif(${BUILD_WITH_NURAFT})
# BDB
if(${BUILD_WITH_BDB})
add_library(bdb STATIC IMPORTED)
set_target_properties(bdb PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb"
)
target_link_libraries(bdb
INTERFACE pthread
)
endif(${BUILD_WITH_BDB})
# SQLite
if(${BUILD_WITH_SQLITE})
add_library(sqlite STATIC IMPORTED)
set_target_properties(sqlite PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/sqlite/.libs/libsqlite3.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/sqlite"
)
target_link_libraries(sqlite
INTERFACE m
INTERFACE pthread
INTERFACE dl
)
endif(${BUILD_WITH_SQLITE})
# ================================================================================================
# DEPENDENCY TEST
......
......@@ -6,3 +6,13 @@ endif(${BUILD_WITH_ROCKSDB})
if(${BUILD_WITH_LUCENE})
add_subdirectory(lucene)
endif(${BUILD_WITH_LUCENE})
if(${BUILD_WITH_BDB})
add_subdirectory(bdb)
endif(${BUILD_WITH_BDB})
if(${BUILD_WITH_SQLITE})
add_subdirectory(sqlite)
endif(${BUILD_WITH_SQLITE})
add_subdirectory(tdev)
add_executable(bdbTest "")
target_sources(
bdbTest PRIVATE
"bdbTest.c"
)
target_link_libraries(bdbTest bdb)
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include "db.h"
// refer: https://docs.oracle.com/cd/E17076_05/html/gsg/C/BerkeleyDB-Core-C-GSG.pdf
int main(int argc, char const *argv[]) {
DB * db;
int ret;
uint32_t flags;
ret = db_create(&db, NULL, 0);
if (ret != 0) {
exit(1);
}
flags = DB_CREATE;
ret = db->open(db, NULL, "test.db", NULL, DB_BTREE, flags, 0);
if (ret != 0) {
exit(1);
}
db->close(db, 0);
return 0;
}
add_executable(sqliteTest "")
target_sources(
sqliteTest PRIVATE
"sqliteTest.c"
)
target_link_libraries(sqliteTest sqlite)
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include "sqlite3.h"
static void count_table(sqlite3 *db) {
int rc;
char * sql = "select * from t;";
sqlite3_stmt *stmt = NULL;
int nrows = 0;
rc = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL);
while (SQLITE_ROW == sqlite3_step(stmt)) {
nrows++;
}
printf("Number of rows: %d\n", nrows);
}
int main(int argc, char const *argv[]) {
sqlite3 *db;
char * err_msg = 0;
int rc = sqlite3_open("test.db", &db);
if (rc != SQLITE_OK) {
fprintf(stderr, "Cannot open database: %s\n", sqlite3_errmsg(db));
sqlite3_close(db);
return 1;
}
char *sql =
"DROP TABLE IF EXISTS t;"
"CREATE TABLE t(id BIGINT);";
rc = sqlite3_exec(db, sql, 0, 0, &err_msg);
if (rc != SQLITE_OK) {
fprintf(stderr, "SQL error: %s\n", err_msg);
sqlite3_free(err_msg);
sqlite3_close(db);
return 1;
}
{
// Write a lot of data
int nrows = 1000;
int batch = 100;
char tsql[1024];
int v = 0;
// sqlite3_exec(db, "PRAGMA journal_mode=WAL;", 0, 0, &err_msg);
sqlite3_exec(db, "PRAGMA read_uncommitted=true;", 0, 0, &err_msg);
for (int k = 0; k < nrows / batch; k++) {
sqlite3_exec(db, "begin;", 0, 0, &err_msg);
for (int i = 0; i < batch; i++) {
v++;
sprintf(tsql, "insert into t values (%d)", v);
rc = sqlite3_exec(db, tsql, 0, 0, &err_msg);
if (rc != SQLITE_OK) {
fprintf(stderr, "SQL error: %s\n", err_msg);
sqlite3_free(err_msg);
sqlite3_close(db);
return 1;
}
}
count_table(db);
sqlite3_exec(db, "commit;", 0, 0, &err_msg);
}
}
sqlite3_close(db);
return 0;
}
aux_source_directory(src TDEV_SRC)
add_executable(tdev ${TDEV_SRC})
target_include_directories(tdev PUBLIC inc)
\ No newline at end of file
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#define POINTER_SHIFT(ptr, s) ((void *)(((char *)ptr) + (s)))
#define POINTER_DISTANCE(pa, pb) ((char *)(pb) - (char *)(pa))
#define tPutA(buf, val) \
({ \
memcpy(buf, &val, sizeof(val)); \
POINTER_SHIFT(buf, sizeof(val)); \
})
#define tPutB(buf, val) \
({ \
((uint8_t *)buf)[3] = ((val) >> 24) & 0xff; \
((uint8_t *)buf)[2] = ((val) >> 16) & 0xff; \
((uint8_t *)buf)[1] = ((val) >> 8) & 0xff; \
((uint8_t *)buf)[0] = (val)&0xff; \
POINTER_SHIFT(buf, sizeof(val)); \
})
#define tPutC(buf, val) \
({ \
((uint64_t *)buf)[0] = (val); \
POINTER_SHIFT(buf, sizeof(val)); \
})
typedef enum { A, B, C } T;
static void func(T t) {
uint64_t val = 198;
char buf[1024];
void * pBuf = buf;
switch (t) {
case A:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
pBuf = tPutA(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
}
break;
case B:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
pBuf = tPutB(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
}
break;
case C:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
pBuf = tPutC(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
}
break;
default:
break;
}
}
static uint64_t now() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000 + tv.tv_usec;
}
int main(int argc, char const *argv[]) {
uint64_t t1 = now();
func(A);
uint64_t t2 = now();
printf("A: %ld\n", t2 - t1);
func(B);
uint64_t t3 = now();
printf("B: %ld\n", t3 - t2);
func(C);
uint64_t t4 = now();
printf("C: %ld\n", t4 - t3);
return 0;
}
......@@ -17,53 +17,108 @@
#define _TD_COMMON_ROW_H_
#include "os.h"
#include "tbuffer.h"
#include "tdataformat.h"
#include "tdef.h"
#include "tschema.h"
#ifdef __cplusplus
extern "C" {
#endif
// types
typedef void * SRow;
typedef struct SRowBatch SRowBatch;
typedef struct SRowBuilder SRowBuilder;
typedef struct SRowBatchIter SRowBatchIter;
typedef struct SRowBatchBuilder SRowBatchBuilder;
// SRow
#define ROW_HEADER_SIZE (sizeof(uint8_t) + 2 * sizeof(uint16_t) + sizeof(uint64_t))
#define rowType(r) (*(uint8_t *)(r)) // row type
#define rowLen(r) (*(uint16_t *)POINTER_SHIFT(r, sizeof(uint8_t))) // row length
#define rowSVer(r) \
(*(uint16_t *)POINTER_SHIFT(r, sizeof(uint8_t) + sizeof(uint16_t))) // row schema version, only for SDataRow
#define rowNCols(r) rowSVer(r) // only for SKVRow
#define rowVer(r) (*(uint64_t)POINTER_SHIFT(r, sizeof(uint8_t) + 2 * sizeof(uint16_t))) // row version
#define rowCopy(dest, r) memcpy((dest), r, rowLen(r))
static FORCE_INLINE SRow rowDup(SRow row) {
SRow r = malloc(rowLen(row));
if (r == NULL) {
return NULL;
}
rowCopy(r, row);
return r;
}
#define TD_UNDECIDED_ROW 0
#define TD_OR_ROW 1
#define TD_KV_ROW 2
typedef struct {
// TODO
} SOrRow;
typedef struct {
col_id_t cid;
uint32_t offset;
} SKvRowIdx;
typedef struct {
uint16_t ncols;
SKvRowIdx cidx[];
} SKvRow;
typedef struct {
union {
/// union field for encode and decode
uint32_t info;
struct {
/// row type
uint32_t type : 2;
/// row schema version
uint32_t sver : 16;
/// is delete row
uint32_t del : 1;
/// reserved for back compatibility
uint32_t reserve : 13;
};
};
/// row total length
uint32_t len;
/// row version
uint64_t ver;
/// timestamp
TSKEY ts;
/// the inline data, maybe a tuple or a k-v tuple
char data[];
} STSRow;
typedef struct {
uint32_t nRows;
char rows[];
} STSRowBatch;
typedef enum {
/// ordinary row builder
TD_OR_ROW_BUILDER = 0,
/// kv row builder
TD_KV_ROW_BUILDER,
/// self-determined row builder
TD_SD_ROW_BUILDER
} ERowBbuilderT;
typedef struct {
/// row builder type
ERowBbuilderT type;
/// buffer writer
SBufferWriter bw;
/// target row
STSRow *pRow;
} STSRowBuilder;
typedef struct {
STSchema *pSchema;
STSRow * pRow;
} STSRowReader;
// SRowBatch
typedef struct {
uint32_t it;
STSRowBatch *pRowBatch;
} STSRowBatchIter;
// SRowBuilder
SRowBuilder *rowBuilderCreate();
void rowBuilderDestroy(SRowBuilder *);
// STSRowBuilder
#define trbInit(rt, allocator, endian, target, size) \
{ .type = (rt), .bw = tbufInitWriter(allocator, endian), .pRow = (target) }
void trbSetRowInfo(STSRowBuilder *pRB, bool del, uint16_t sver);
void trbSetRowVersion(STSRowBuilder *pRB, uint64_t ver);
void trbSetRowTS(STSRowBuilder *pRB, TSKEY ts);
int trbWriteCol(STSRowBuilder *pRB, void *pData, col_id_t cid);
// SRowBatchIter
SRowBatchIter *rowBatchIterCreate(SRowBatch *);
void rowBatchIterDestroy(SRowBatchIter *);
const SRow rowBatchIterNext(SRowBatchIter *);
// STSRowReader
#define tRowReaderInit(schema, row) \
{ .schema = (schema), .row = (row) }
int tRowReaderRead(STSRowReader *pRowReader, col_id_t cid, void *target, uint64_t size);
// SRowBatchBuilder
SRowBatchBuilder *rowBatchBuilderCreate();
void rowBatchBuilderDestroy(SRowBatchBuilder *);
// STSRowBatchIter
#define tRowBatchIterInit(pRB) \
{ .it = 0, .pRowBatch = (pRB) }
const STSRow *tRowBatchIterNext(STSRowBatchIter *pRowBatchIter);
#ifdef __cplusplus
}
......
......@@ -16,10 +16,65 @@
#ifndef _TD_COMMON_SCHEMA_H_
#define _TD_COMMON_SCHEMA_H_
#include "os.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef uint16_t col_id_t;
#if 0
typedef struct STColumn {
/// column name
char *cname;
union {
/// for encode purpose
uint64_t info;
struct {
uint64_t sma : 1;
/// column data type
uint64_t type : 7;
/// column id
uint64_t cid : 16;
/// max bytes of the column
uint64_t bytes : 32;
/// reserved
uint64_t reserve : 8;
};
};
/// comment about the column
char *comment;
} STColumn;
typedef struct STSchema {
/// schema version
uint16_t sver;
/// number of columns
uint16_t ncols;
/// sma attributes
struct {
bool sma;
SArray *smaArray;
};
/// column info
STColumn cols[];
} STSchema;
typedef struct {
uint64_t size;
STSchema *pSchema;
} STShemaBuilder;
#define tSchemaBuilderInit(target, capacity) \
{ .size = (capacity), .pSchema = (target) }
void tSchemaBuilderSetSver(STShemaBuilder *pSchemaBuilder, uint16_t sver);
void tSchemaBuilderSetSMA(bool sma, SArray *smaArray);
int tSchemaBuilderPutColumn(char *cname, bool sma, uint8_t type, col_id_t cid, uint32_t bytes, char *comment);
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -13,18 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_FILE_SYSTEM_H_
#define _TD_VNODE_FILE_SYSTEM_H_
#ifndef _TD_TYPE_H_
#define _TD_TYPE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
} SVnodeFS;
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_FILE_SYSTEM_H_*/
\ No newline at end of file
#endif /*_TD_TYPE_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_IMPL_H_
#define _TD_META_IMPL_H_
#include "os.h"
#include "taosmsg.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef uint64_t tb_uid_t;
/* ------------------------ SMetaOptions ------------------------ */
struct SMetaOptions {
size_t lruCacheSize; // LRU cache size
};
/* ------------------------ STbOptions ------------------------ */
#define META_NORMAL_TABLE ((uint8_t)1)
#define META_SUPER_TABLE ((uint8_t)2)
#define META_CHILD_TABLE ((uint8_t)3)
typedef struct {
} SSMAOptions;
// super table options
typedef struct {
tb_uid_t uid;
STSchema* pSchema;
STSchema* pTagSchema;
} SSTbOptions;
// child table options
typedef struct {
tb_uid_t suid;
SKVRow tags;
} SCTbOptions;
// normal table options
typedef struct {
STSchema* pSchame;
} SNTbOptions;
struct STbOptions {
uint8_t type;
char* name;
uint32_t ttl; // time to live in (SECONDS)
SSMAOptions bsma; // Block-wise sma
union {
SSTbOptions stbOptions;
SNTbOptions ntbOptions;
SCTbOptions ctbOptions;
};
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_META_IMPL_H_*/
\ No newline at end of file
......@@ -16,36 +16,96 @@
#ifndef _TD_META_H_
#define _TD_META_H_
#include "impl/metaImpl.h"
#include "os.h"
#include "trow.h"
#ifdef __cplusplus
extern "C" {
#endif
// Types exported
typedef struct SMeta SMeta;
typedef struct SMetaOptions SMetaOptions;
typedef struct STbOptions STbOptions;
typedef uint64_t tb_uid_t;
typedef struct SMeta SMeta;
#define META_SUPER_TABLE 0
#define META_CHILD_TABLE 1
#define META_NORMAL_TABLE 2
typedef struct SMetaCfg {
/// LRU cache size
uint64_t lruSize;
} SMetaCfg;
typedef struct STbCfg {
/// name of the table
char *name;
/// time to live of the table
uint32_t ttl;
/// keep time of this table
uint32_t keep;
/// type of table
uint8_t type;
union {
/// super table configurations
struct {
/// super table UID
tb_uid_t suid;
/// row schema
STSchema *pSchema;
/// tag schema
STSchema *pTagSchema;
} stbCfg;
/// normal table configuration
struct {
/// row schema
STSchema *pSchema;
} ntbCfg;
/// child table configuration
struct {
/// super table UID
tb_uid_t suid;
SKVRow pTag;
} ctbCfg;
};
} STbCfg;
// SMeta operations
SMeta *metaOpen(const char *path, const SMetaOptions *pOptions);
SMeta *metaOpen(const char *path, const SMetaCfg *pOptions);
void metaClose(SMeta *pMeta);
void metaRemove(const char *path);
int metaCreateTable(SMeta *pMeta, const STbOptions *pTbOptions);
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta);
// Options
void metaOptionsInit(SMetaOptions *pOptions);
void metaOptionsClear(SMetaOptions *pOptions);
// STableOpts
#define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0}
void metaNormalTableOptsInit(STbOptions *pTbOptions, const char *name, const STSchema *pSchema);
void metaSuperTableOptsInit(STbOptions *pTbOptions, const char *name, tb_uid_t uid, const STSchema *pSchema,
const STSchema *pTagSchema);
void metaChildTableOptsInit(STbOptions *pTbOptions, const char *name, tb_uid_t suid, const SKVRow tags);
void metaTableOptsClear(STbOptions *pTbOptions);
void metaOptionsInit(SMetaCfg *pOptions);
void metaOptionsClear(SMetaCfg *pOptions);
// STbCfg
#define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \
{ \
.name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_SUPER_TABLE, .stbCfg = { \
.suid = (SUID), \
.pSchema = (PSCHEMA), \
.pTagSchema = (PTAGSCHEMA) \
} \
}
#define META_INIT_CTB_CFG(NAME, TTL, KEEP, SUID, PTAG) \
{ \
.name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_CHILD_TABLE, .ctbCfg = {.suid = (SUID), .pTag = PTAG } \
}
#define META_INIT_NTB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA) \
{ \
.name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_NORMAL_TABLE, .ntbCfg = {.pSchema = (PSCHEMA) } \
}
#define META_CLEAR_TB_CFG(pTbCfg)
int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg);
void *metaDecodeTbCfg(void *pBuf, STbCfg **pTbCfg);
#ifdef __cplusplus
}
......
......@@ -161,9 +161,9 @@ typedef struct TqLogReader {
int64_t (*logGetLastVer)(void* logHandle);
} TqLogReader;
typedef struct TqConfig {
typedef struct STqCfg {
// TODO
} TqConfig;
} STqCfg;
typedef struct TqMemRef {
SMemAllocatorFactory *pAlloctorFactory;
......@@ -256,14 +256,14 @@ typedef struct STQ {
// the collection of group handle
// the handle of kvstore
char* path;
TqConfig* tqConfig;
STqCfg* tqConfig;
TqLogReader* tqLogReader;
TqMemRef tqMemRef;
TqMetaStore* tqMeta;
} STQ;
// open in each vnode
STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac);
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac);
void tqDestroy(STQ*);
// void* will be replace by a msg type
......
......@@ -22,21 +22,25 @@ extern "C" {
// TYPES EXPOSED
typedef struct STsdb STsdb;
typedef struct STsdbOptions STsdbOptions;
typedef struct STsdbCfg STsdbCfg;
typedef struct STsdbMemAllocator STsdbMemAllocator;
// STsdb
STsdb *tsdbOpen(const char *path, const STsdbOptions *);
STsdb *tsdbOpen(const char *path, const STsdbCfg *);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, void *pData, int len);
// STsdbOptions
int tsdbOptionsInit(STsdbOptions *);
void tsdbOptionsClear(STsdbOptions *);
// STsdbCfg
int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *);
/* ------------------------ STRUCT DEFINITIONS ------------------------ */
struct STsdbOptions {
struct STsdbCfg {
uint64_t lruCacheSize;
uint32_t keep0;
uint32_t keep1;
uint32_t keep2;
/* TODO */
};
......
......@@ -23,24 +23,67 @@
#include "tarray.h"
#include "tq.h"
#include "tsdb.h"
#include "wal.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SVnode SVnode;
typedef struct SVnodeOptions SVnodeOptions;
typedef struct SVnode SVnode;
typedef struct SVnodeCfg {
/** vnode buffer pool options */
struct {
/** write buffer size */
uint64_t wsize;
/** use heap allocator or arena allocator */
bool isHeapAllocator;
};
/** time to live of tables in this vnode */
uint32_t ttl;
/** data to keep in this vnode */
uint32_t keep;
/** if TS data is eventually consistency */
bool isWeak;
/** TSDB config */
STsdbCfg tsdbCfg;
/** META config */
SMetaCfg metaCfg;
/** TQ config */
STqCfg tqCfg;
/** WAL config */
SWalCfg walCfg;
} SVnodeCfg;
/* ------------------------ SVnode ------------------------ */
/**
* @brief Initialize the vnode module
*
* @return int 0 for success and -1 for failure
*/
int vnodeInit();
/**
* @brief clear a vnode
*
*/
void vnodeClear();
/**
* @brief Open a VNODE.
*
* @param path path of the vnode
* @param pVnodeOptions options of the vnode
* @param pVnodeCfg options of the vnode
* @return SVnode* The vnode object
*/
SVnode *vnodeOpen(const char *path, const SVnodeOptions *pVnodeOptions);
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
/**
* @brief Close a VNODE
......@@ -85,61 +128,55 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
*/
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/* ------------------------ SVnodeOptions ------------------------ */
/* ------------------------ SVnodeCfg ------------------------ */
/**
* @brief Initialize VNODE options.
*
* @param pOptions The options object to be initialized. It should not be NULL.
*/
void vnodeOptionsInit(SVnodeOptions *pOptions);
void vnodeOptionsInit(SVnodeCfg *pOptions);
/**
* @brief Clear VNODE options.
*
* @param pOptions Options to clear.
*/
void vnodeOptionsClear(SVnodeOptions *pOptions);
/* ------------------------ STRUCT DEFINITIONS ------------------------ */
struct SVnodeOptions {
/**
* @brief write buffer size in BYTES
*
*/
uint64_t wsize;
/**
* @brief time to live of tables in this vnode
* in SECONDS
*
*/
uint32_t ttl;
void vnodeOptionsClear(SVnodeCfg *pOptions);
/**
* @brief if time-series requests eventual consistency
*
*/
bool isWeak;
/* ------------------------ REQUESTS ------------------------ */
typedef STbCfg SVCreateTableReq;
typedef struct {
tb_uid_t uid;
} SVDropTableReq;
/**
* @brief if the allocator is heap allcator or arena allocator
*
*/
bool isHeapAllocator;
/**
* @brief TSDB options
*
*/
STsdbOptions tsdbOptions;
/**
* @brief META options
*
*/
SMetaOptions metaOptions;
// STqOptions tqOptions; // TODO
};
typedef struct {
// TODO
} SVSubmitReq;
typedef struct {
uint64_t ver;
union {
SVCreateTableReq ctReq;
SVDropTableReq dtReq;
};
} SVnodeReq;
typedef struct {
int err;
char info[];
} SVnodeRsp;
#define VNODE_INIT_CREATE_STB_REQ(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \
{ .ver = 0, .ctReq = META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) }
#define VNODE_INIT_CREATE_CTB_REQ(NAME, TTL, KEEP, SUID, PTAG) \
{ .ver = 0, .ctReq = META_INIT_CTB_CFG(NAME, TTL, KEEP, SUID, PTAG) }
#define VNODE_INIT_CREATE_NTB_REQ(NAME, TTL, KEEP, SUID, PSCHEMA) \
{ .ver = 0, .ctReq = META_INIT_NTB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA) }
int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type);
void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
/* ------------------------ FOR COMPILE ------------------------ */
......@@ -148,27 +185,27 @@ struct SVnodeOptions {
#include "taosmsg.h"
#include "trpc.h"
typedef struct {
char db[TSDB_FULL_DB_NAME_LEN];
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int8_t precision; // time resolution
int8_t compression;
int8_t cacheLastRow;
int8_t update;
int8_t quorum;
int8_t replica;
int8_t selfIndex;
int8_t walLevel;
int32_t fsyncPeriod; // millisecond
SReplica replicas[TSDB_MAX_REPLICA];
} SVnodeCfg;
// typedef struct {
// char db[TSDB_FULL_DB_NAME_LEN];
// int32_t cacheBlockSize; // MB
// int32_t totalBlocks;
// int32_t daysPerFile;
// int32_t daysToKeep0;
// int32_t daysToKeep1;
// int32_t daysToKeep2;
// int32_t minRowsPerFileBlock;
// int32_t maxRowsPerFileBlock;
// int8_t precision; // time resolution
// int8_t compression;
// int8_t cacheLastRow;
// int8_t update;
// int8_t quorum;
// int8_t replica;
// int8_t selfIndex;
// int8_t walLevel;
// int32_t fsyncPeriod; // millisecond
// SReplica replicas[TSDB_MAX_REPLICA];
// } SVnodeCfg;
typedef enum {
VN_MSG_TYPE_WRITE = 1,
......
......@@ -22,10 +22,10 @@
extern "C" {
#endif
typedef struct SMemAllocator SMemAllocator;
typedef struct SMemAllocator SMemAllocator;
typedef struct SMemAllocatorFactory SMemAllocatorFactory;
struct SMemAllocator {
char name[16];
void *impl;
void *(*malloc)(SMemAllocator *, uint64_t size);
void *(*calloc)(SMemAllocator *, uint64_t nmemb, uint64_t size);
......@@ -34,11 +34,11 @@ struct SMemAllocator {
uint64_t (*usage)(SMemAllocator *);
};
typedef struct {
struct SMemAllocatorFactory {
void *impl;
SMemAllocator *(*create)();
void (*destroy)(SMemAllocator *);
} SMemAllocatorFactory;
SMemAllocator *(*create)(SMemAllocatorFactory *);
void (*destroy)(SMemAllocatorFactory *, SMemAllocator *);
};
#ifdef __cplusplus
}
......
......@@ -15,75 +15,21 @@
#include "trow.h"
/* ------------ Structures ---------- */
struct SRowBatch {
int32_t compress : 1; // if batch row is compressed
int32_t nrows : 31; // number of rows
int32_t tlen; // total length (including `nrows` and `tlen`)
char rows[];
};
struct SRowBuilder {
// TODO
};
struct SRowBatchIter {
int32_t counter; // row counter
SRowBatch *rb; // row batch to iter
SRow nrow; // next row
};
struct SRowBatchBuilder {
#if 0
void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) {
// TODO
};
/* ------------ Methods ---------- */
// SRowBuilder
SRowBuilder *rowBuilderCreate() {
SRowBuilder *pRowBuilder = NULL;
// TODO
return pRowBuilder;
}
void rowBuilderDestroy(SRowBuilder *pRowBuilder) {
if (pRowBuilder) {
free(pRowBuilder);
}
void trbSetRowVersion(SRowBuilder *pRB, uint64_t ver) {
// TODO
}
// SRowBatchIter
SRowBatchIter *rowBatchIterCreate(SRowBatch *pRowBatch) {
SRowBatchIter *pRowBatchIter = (SRowBatchIter *)malloc(sizeof(*pRowBatchIter));
if (pRowBatchIter == NULL) {
return NULL;
}
pRowBatchIter->counter = 0;
pRowBatchIter->rb = pRowBatch;
pRowBatchIter->nrow = pRowBatch->rows;
return pRowBatchIter;
};
void rowBatchIterDestroy(SRowBatchIter *pRowBatchIter) {
if (pRowBatchIter) {
free(pRowBatchIter);
}
void trbSetRowTS(SRowBuilder *pRB, TSKEY ts) {
// TODO
}
const SRow rowBatchIterNext(SRowBatchIter *pRowBatchIter) {
SRow r = NULL;
if (pRowBatchIter->counter < pRowBatchIter->rb->nrows) {
r = pRowBatchIter->nrow;
pRowBatchIter->counter += 1;
pRowBatchIter->nrow = (SRow)POINTER_SHIFT(r, rowLen(r));
}
return r;
int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) {
// TODO
return 0;
}
// SRowBatchBuilder
SRowBatchBuilder *rowBatchBuilderCreate();
void rowBatchBuilderDestroy(SRowBatchBuilder *);
\ No newline at end of file
#endif
\ No newline at end of file
#include <gtest/gtest.h>
#include "trow.h"
TEST(td_row_test, build_row_to_target) {
#if 0
char dst[1024];
SRow* pRow = (SRow*)dst;
int ncols = 10;
col_id_t cid;
void* pData;
SRowBuilder rb = trbInit(TD_OR_ROW_BUILDER, NULL, 0, pRow, 1024);
trbSetRowInfo(&rb, false, 0);
trbSetRowTS(&rb, 1637550210000);
for (int c = 0; c < ncols; c++) {
cid = c;
if (trbWriteCol(&rb, pData, cid) < 0) {
// TODO
}
}
#endif
}
\ No newline at end of file
#include <gtest/gtest.h>
#include "tschema.h"
TEST(td_schema_test, build_schema_test) {
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -38,7 +38,7 @@ typedef struct {
int32_t threadIndex;
pthread_t *pThreadId;
SVnodeObj *pVnodes;
SDnode *pDnode;
SDnode * pDnode;
} SVnodeThread;
static int32_t dndInitVnodeReadWorker(SDnode *pDnode);
......@@ -73,7 +73,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl);
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode);
......@@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = NULL;
SVnodeObj * pVnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
......@@ -128,7 +128,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -208,7 +208,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
SVnodeObj *pVnode = *ppVnode;
SVnodeObj * pVnode = *ppVnode;
if (pVnode) {
num++;
if (num < size) {
......@@ -230,9 +230,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_
int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
char file[PATH_MAX + 20] = {0};
SVnodeObj *pVnodes = NULL;
......@@ -277,7 +277,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_
}
for (int32_t i = 0; i < vnodesNum; ++i) {
cJSON *vnode = cJSON_GetArrayItem(vnodes, i);
cJSON * vnode = cJSON_GetArrayItem(vnodes, i);
SVnodeObj *pVnode = &pVnodes[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
......@@ -321,7 +321,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
char * content = calloc(1, maxLen + 1);
int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
......@@ -403,8 +403,8 @@ static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
static void *dnodeOpenVnodeFunc(void *param) {
SVnodeThread *pThread = param;
SDnode *pDnode = pThread->pDnode;
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SDnode * pDnode = pThread->pDnode;
SVnodesMgmt * pMgmt = &pDnode->vmgmt;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes");
......@@ -527,6 +527,7 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
*vgId = htonl(pCreate->vgId);
#if 0
tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCfg->totalBlocks = htonl(pCreate->totalBlocks);
......@@ -549,6 +550,7 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg
pCfg->replicas[i].port = htons(pCreate->replicas[i].port);
tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
}
#endif
return 0;
}
......@@ -738,7 +740,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs);
SRpcMsg *pRpcMsg = NULL;
SRpcMsg * pRpcMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
......@@ -1011,7 +1013,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) {
}
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodesMgmt * pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->writePool;
pPool->name = "vnode-write";
pPool->max = tsNumOfCores;
......@@ -1119,12 +1121,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
pLoads->num = taosHashGetSize(pMgmt->hash);
int32_t v = 0;
void *pIter = taosHashIterate(pMgmt->hash, NULL);
void * pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
SVnodeObj * pVnode = *ppVnode;
SVnodeLoad *pLoad = &pLoads->data[v++];
vnodeGetLoad(pVnode->pImpl, pLoad);
......
......@@ -25,8 +25,9 @@ extern "C" {
typedef struct SVBufPool SVBufPool;
int vnodeOpenBufPool(SVnode *pVnode);
void vnodeCloseBufPool(SVnode *pVnode);
int vnodeOpenBufPool(SVnode *pVnode);
void vnodeCloseBufPool(SVnode *pVnode);
void *vnodeMalloc(SVnode *pVnode, uint64_t size);
#ifdef __cplusplus
}
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_OPTIONS_H_
#define _TD_VNODE_OPTIONS_H_
#ifndef _TD_VNODE_CFG_H_
#define _TD_VNODE_CFG_H_
#include "vnode.h"
......@@ -22,13 +22,13 @@
extern "C" {
#endif
extern const SVnodeOptions defaultVnodeOptions;
extern const SVnodeCfg defaultVnodeOptions;
int vnodeValidateOptions(const SVnodeOptions *);
void vnodeOptionsCopy(SVnodeOptions *pDest, const SVnodeOptions *pSrc);
int vnodeValidateOptions(const SVnodeCfg *);
void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_OPTIONS_H_*/
\ No newline at end of file
#endif /*_TD_VNODE_CFG_H_*/
\ No newline at end of file
......@@ -20,12 +20,14 @@
#include "sync.h"
#include "tlockfree.h"
#include "wal.h"
#include "tcoding.h"
#include "vnode.h"
#include "vnodeBufferPool.h"
#include "vnodeCfg.h"
#include "vnodeCommit.h"
#include "vnodeFileSystem.h"
#include "vnodeOptions.h"
#include "vnodeFS.h"
#include "vnodeRequest.h"
#include "vnodeStateMgr.h"
#include "vnodeSync.h"
......@@ -34,16 +36,16 @@ extern "C" {
#endif
struct SVnode {
char* path;
SVnodeOptions options;
SVState state;
SVBufPool* pBufPool;
SMeta* pMeta;
STsdb* pTsdb;
STQ* pTq;
SWal* pWal;
SVnodeSync* pSync;
SVnodeFS* pFs;
char* path;
SVnodeCfg config;
SVState state;
SVBufPool* pBufPool;
SMeta* pMeta;
STsdb* pTsdb;
STQ* pTq;
SWal* pWal;
SVnodeSync* pSync;
SVnodeFS* pFs;
};
#ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_FS_H_
#define _TD_VNODE_FS_H_
#include "vnode.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
} SDir;
typedef struct {
} SFile;
typedef struct SFS {
void *pImpl;
int (*startEdit)(struct SFS *);
int (*endEdit)(struct SFS *);
} SFS;
typedef struct {
} SVnodeFS;
int vnodeOpenFS(SVnode *pVnode);
void vnodeCloseFS(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_FS_H_*/
\ No newline at end of file
......@@ -16,23 +16,15 @@
#ifndef _TD_VNODE_REQUEST_H_
#define _TD_VNODE_REQUEST_H_
#include "vnode.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SVnodeReq SVnodeReq;
typedef struct SVnodeRsp SVnodeRsp;
typedef enum {
} EVReqT;
struct SVnodeReq {
/* TODO */
};
struct SVnodeRsp {
/* TODO */
};
// SVDropTableReq
int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq);
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq);
#ifdef __cplusplus
}
......
......@@ -21,6 +21,9 @@ extern "C" {
#endif
typedef struct {
uint64_t processed;
uint64_t committed;
uint64_t applied;
} SVState;
#ifdef __cplusplus
......
......@@ -19,9 +19,12 @@
#define VNODE_BUF_POOL_SHARDS 3
struct SVBufPool {
// buffer pool impl
SList free;
SList incycle;
SListNode *inuse;
// MAF for submodules
SMemAllocatorFactory maf;
};
typedef enum {
......@@ -49,6 +52,11 @@ typedef struct {
SVArenaNode node;
} SVArenaAllocator;
typedef struct {
SVnode * pVnode;
SListNode *pNode;
} SVMAWrapper;
typedef struct {
T_REF_DECLARE()
uint64_t capacity;
......@@ -59,8 +67,11 @@ typedef struct {
};
} SVMemAllocator;
static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode);
static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode);
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma);
static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size);
int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity;
......@@ -74,8 +85,8 @@ int vnodeOpenBufPool(SVnode *pVnode) {
tdListInit(&(pVnode->pBufPool->free), 0);
tdListInit(&(pVnode->pBufPool->incycle), 0);
capacity = pVnode->options.wsize / VNODE_BUF_POOL_SHARDS;
if (pVnode->options.isHeapAllocator) {
capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS;
if (pVnode->config.isHeapAllocator) {
type = E_V_HEAP_ALLOCATOR;
}
......@@ -89,6 +100,10 @@ int vnodeOpenBufPool(SVnode *pVnode) {
tdListAppendNode(&(pVnode->pBufPool->free), pNode);
}
pVnode->pBufPool->maf.impl = pVnode;
pVnode->pBufPool->maf.create = vBufPoolCreateMA;
pVnode->pBufPool->maf.destroy = vBufPoolDestroyMA;
return 0;
}
......@@ -115,6 +130,24 @@ void vnodeCloseBufPool(SVnode *pVnode) {
}
}
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
void *ptr;
if (pVnode->pBufPool->inuse == NULL) {
SListNode *pNode;
while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) == NULL) {
// todo
// tsem_wait();
ASSERT(0);
}
pVnode->pBufPool->inuse = pNode;
}
SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
return vBufPoolMalloc(pvma, size);
}
/* ------------------------ STATIC METHODS ------------------------ */
static void vArenaAllocatorInit(SVArenaAllocator *pvaa, uint64_t capacity, uint64_t ssize, uint64_t lsize) { /* TODO */
pvaa->ssize = ssize;
......@@ -185,4 +218,103 @@ static void vBufPoolFreeNode(SListNode *pNode) {
}
free(pNode);
}
static void *vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size) {
void *ptr = NULL;
if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa);
if (POINTER_DISTANCE(pvaa->inuse->ptr, pvaa->inuse->data) + size > pvaa->inuse->size) {
SVArenaNode *pNode = (SVArenaNode *)malloc(sizeof(*pNode) + MAX(size, pvaa->ssize));
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
pNode->prev = pvaa->inuse;
pNode->size = MAX(size, pvaa->ssize);
pNode->ptr = pNode->data;
pvaa->inuse = pNode;
}
ptr = pvaa->inuse->ptr;
pvaa->inuse->ptr = POINTER_SHIFT(ptr, size);
} else if (pvma->type == E_V_HEAP_ALLOCATOR) {
/* TODO */
}
return ptr;
}
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) {
SVnode * pVnode;
SMemAllocator * pma;
SVMemAllocator *pvma;
SVMAWrapper * pvmaw;
pVnode = (SVnode *)(pmaf->impl);
pma = (SMemAllocator *)calloc(1, sizeof(*pma) + sizeof(SVMAWrapper));
if (pma == NULL) {
// TODO: handle error
return NULL;
}
pvmaw = (SVMAWrapper *)POINTER_SHIFT(pma, sizeof(*pma));
// No allocator used currently
if (pVnode->pBufPool->inuse == NULL) {
while (listNEles(&(pVnode->pBufPool->free)) == 0) {
// TODO: wait until all released ro kill query
// tsem_wait();
ASSERT(0);
}
pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free));
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
T_REF_INIT_VAL(pvma, 1);
} else {
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
}
T_REF_INC(pvma);
pvmaw->pVnode = pVnode;
pvmaw->pNode = pVnode->pBufPool->inuse;
pma->impl = pvmaw;
pma->malloc = NULL;
pma->calloc = NULL; /* TODO */
pma->realloc = NULL; /* TODO */
pma->free = NULL; /* TODO */
pma->usage = NULL; /* TODO */
return pma;
}
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { /* TODO */
SVnode * pVnode = (SVnode *)(pmaf->impl);
SListNode * pNode = ((SVMAWrapper *)(pma->impl))->pNode;
SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data);
if (T_REF_DEC(pvma) == 0) {
if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa);
while (pvaa->inuse != &(pvaa->node)) {
SVArenaNode *pNode = pvaa->inuse;
pvaa->inuse = pNode->prev;
/* code */
}
pvaa->inuse->ptr = pvaa->inuse->data;
} else if (pvma->type == E_V_HEAP_ALLOCATOR) {
} else {
ASSERT(0);
}
// Move node from incycle to free
tdListAppendNode(&(pVnode->pBufPool->free), tdListPopNode(&(pVnode->pBufPool->incycle), pNode));
// tsem_post(); todo: sem_post
}
}
\ No newline at end of file
......@@ -15,20 +15,20 @@
#include "vnodeDef.h"
const SVnodeOptions defaultVnodeOptions = {0}; /* TODO */
const SVnodeCfg defaultVnodeOptions = {0}; /* TODO */
void vnodeOptionsInit(SVnodeOptions *pVnodeOptions) { /* TODO */
void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */
vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions);
}
void vnodeOptionsClear(SVnodeOptions *pVnodeOptions) { /* TODO */
void vnodeOptionsClear(SVnodeCfg *pVnodeOptions) { /* TODO */
}
int vnodeValidateOptions(const SVnodeOptions *pVnodeOptions) {
int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) {
// TODO
return 0;
}
void vnodeOptionsCopy(SVnodeOptions *pDest, const SVnodeOptions *pSrc) {
memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeOptions));
void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) {
memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg));
}
\ No newline at end of file
......@@ -15,27 +15,40 @@
#include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeOptions *pVnodeOptions);
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode);
SVnode *vnodeOpen(const char *path, const SVnodeOptions *pVnodeOptions) {
int vnodeInit() {
// TODO
if (walInit() < 0) {
return -1;
}
return 0;
}
void vnodeClear() {
walCleanUp();
}
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL;
// Set default options
if (pVnodeOptions == NULL) {
pVnodeOptions = &defaultVnodeOptions;
if (pVnodeCfg == NULL) {
pVnodeCfg = &defaultVnodeOptions;
}
// Validate options
if (vnodeValidateOptions(pVnodeOptions) < 0) {
if (vnodeValidateOptions(pVnodeCfg) < 0) {
// TODO
return NULL;
}
// Create the handle
pVnode = vnodeNew(path, pVnodeOptions);
pVnode = vnodeNew(path, pVnodeCfg);
if (pVnode == NULL) {
// TODO: handle error
return NULL;
......@@ -62,7 +75,7 @@ void vnodeClose(SVnode *pVnode) {
void vnodeDestroy(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */
static SVnode *vnodeNew(const char *path, const SVnodeOptions *pVnodeOptions) {
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL;
pVnode = (SVnode *)calloc(1, sizeof(*pVnode));
......@@ -72,7 +85,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeOptions *pVnodeOptions) {
}
pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->options), pVnodeOptions);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
return pVnode;
}
......@@ -94,7 +107,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open meta
sprintf(dir, "%s/meta", pVnode->path);
pVnode->pMeta = metaOpen(dir, &(pVnode->options.metaOptions));
pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg));
if (pVnode->pMeta == NULL) {
// TODO: handle error
return -1;
......@@ -102,23 +115,23 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, &(pVnode->options.tsdbOptions));
pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg));
if (pVnode->pTsdb == NULL) {
// TODO: handle error
return -1;
}
// TODO: Open TQ
sprintf(dir, "%s/wal", pVnode->path);
// pVnode->pTq = tqOpen(dir, NULL /* TODO */);
// if (pVnode->pTq == NULL) {
// // TODO: handle error
// return -1;
// }
sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, NULL);
if (pVnode->pTq == NULL) {
// TODO: handle error
return -1;
}
// Open WAL
sprintf(dir, "%s/wal", pVnode->path);
pVnode->pWal = walOpen(dir, NULL /* TODO */);
pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) {
// TODO: handle error
return -1;
......
......@@ -11,4 +11,105 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "vnodeDef.h"
static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq);
static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq);
int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type) {
int tsize = 0;
tsize += taosEncodeFixedU64(buf, pReq->ver);
switch (type) {
case TSDB_MSG_TYPE_CREATE_TABLE:
tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq));
/* code */
break;
default:
break;
}
/* TODO */
return tsize;
}
void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type) {
buf = taosDecodeFixedU64(buf, &(pReq->ver));
switch (type) {
case TSDB_MSG_TYPE_CREATE_TABLE:
buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq));
break;
default:
break;
}
// TODO
return buf;
}
static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq) {
int tsize = 0;
tsize += taosEncodeString(buf, pReq->name);
tsize += taosEncodeFixedU32(buf, pReq->ttl);
tsize += taosEncodeFixedU32(buf, pReq->keep);
tsize += taosEncodeFixedU8(buf, pReq->type);
switch (pReq->type) {
case META_SUPER_TABLE:
tsize += taosEncodeFixedU64(buf, pReq->stbCfg.suid);
tsize += tdEncodeSchema(buf, pReq->stbCfg.pSchema);
tsize += tdEncodeSchema(buf, pReq->stbCfg.pTagSchema);
break;
case META_CHILD_TABLE:
tsize += taosEncodeFixedU64(buf, pReq->ctbCfg.suid);
tsize += tdEncodeKVRow(buf, pReq->ctbCfg.pTag);
break;
case META_NORMAL_TABLE:
tsize += tdEncodeSchema(buf, pReq->ntbCfg.pSchema);
break;
default:
break;
}
return tsize;
}
static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) {
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
buf = taosDecodeFixedU32(buf, &(pReq->keep));
buf = taosDecodeFixedU8(buf, &(pReq->type));
switch (pReq->type) {
case META_SUPER_TABLE:
buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid));
buf = tdDecodeSchema(buf, &(pReq->stbCfg.pSchema));
buf = tdDecodeSchema(buf, &(pReq->stbCfg.pTagSchema));
break;
case META_CHILD_TABLE:
buf = taosDecodeFixedU64(buf, &(pReq->ctbCfg.suid));
buf = tdDecodeKVRow(buf, &(pReq->ctbCfg.pTag));
break;
case META_NORMAL_TABLE:
buf = tdDecodeSchema(buf, &(pReq->ntbCfg.pSchema));
break;
default:
break;
}
return buf;
}
int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) {
// TODO
return 0;
}
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) {
// TODO
}
\ No newline at end of file
......@@ -16,51 +16,84 @@
#include "vnodeDef.h"
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
/* TODO */
return 0;
}
SRpcMsg * pMsg;
SVnodeReq *pVnodeReq;
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
#if 0
int reqType; /* TODO */
size_t reqSize; /* TODO */
uint64_t reqVersion = 0; /* TODO */
int code = 0;
// Copy the request to vnode buffer
void *pReq = mMalloc(pVnode->inuse, reqSize);
if (pReq == NULL) {
// TODO: handle error
}
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
memcpy(pReq, pMsg, reqSize);
// Push the request to TQ so consumers can consume
tqPushMsg(pVnode->pTq, pReq, 0);
// Process the request
switch (reqType) {
case TSDB_MSG_TYPE_CREATE_TABLE:
code = metaCreateTable(pVnode->pMeta, NULL /* TODO */);
break;
case TSDB_MSG_TYPE_DROP_TABLE:
code = metaDropTable(pVnode->pMeta, 0 /* TODO */);
break;
case TSDB_MSG_TYPE_SUBMIT:
/* TODO */
break;
default:
break;
}
// ser request version
void * pBuf = pMsg->pCont;
uint64_t ver = pVnode->state.processed++;
taosEncodeFixedU64(&pBuf, ver);
if (vnodeShouldCommit(pVnode)) {
if (vnodeAsyncCommit(pVnode) < 0) {
if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) {
// TODO: handle error
}
}
return code;
#endif
walFsync(pVnode->pWal, false);
// Apply each request now
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
SVnodeReq vReq;
// Apply the request
{
void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
// TODO: copy here need to be extended
memcpy(ptr, pMsg->pCont, pMsg->contLen);
// todo: change the interface here
uint64_t ver;
taosDecodeFixedU64(pMsg->pCont, &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
// TODO: handle error
}
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_TABLE:
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
// TODO: handle error
}
// TODO: maybe need to clear the requst struct
break;
case TSDB_MSG_TYPE_DROP_TABLE:
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// TODO: handle error
}
break;
case TSDB_MSG_TYPE_SUBMIT:
/* code */
break;
default:
break;
}
pVnode->state.applied = ver;
}
// Check if it needs to commit
if (vnodeShouldCommit(pVnode)) {
if (vnodeAsyncCommit(pVnode) < 0) {
// TODO: handle error
}
}
}
return 0;
}
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO
return 0;
}
......
......@@ -3,11 +3,173 @@
#include "vnode.h"
static STSchema *createBasicSchema() {
STSchemaBuilder sb;
STSchema * pSchema = NULL;
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
for (int i = 1; i < 10; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, i, 0);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static STSchema *createBasicTagSchema() {
STSchemaBuilder sb;
STSchema * pSchema = NULL;
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
for (int i = 10; i < 12; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_BINARY, i, 20);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static SKVRow createBasicTag() {
SKVRowBuilder rb;
SKVRow pTag;
tdInitKVRowBuilder(&rb);
for (int i = 10; i < 12; i++) {
void *pVal = malloc(sizeof(VarDataLenT) + strlen("foo"));
varDataLen(pVal) = strlen("foo");
memcpy(varDataVal(pVal), "foo", strlen("foo"));
tdAddColToKVRow(&rb, i, TSDB_DATA_TYPE_BINARY, pVal);
free(pVal);
}
pTag = tdGetKVRowFromBuilder(&rb);
tdDestroyKVRowBuilder(&rb);
return pTag;
}
#if 0
TEST(vnodeApiTest, test_create_table_encode_and_decode_function) {
tb_uid_t suid = 1638166374163;
STSchema *pSchema = createBasicSchema();
STSchema *pTagSchema = createBasicTagSchema();
char tbname[128] = "st";
char * buffer = new char[1024];
void * pBuf = (void *)buffer;
SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
SVnodeReq decoded_req;
vnodeParseReq(buffer, &decoded_req, TSDB_MSG_TYPE_CREATE_TABLE);
int k = 10;
}
#endif
TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
GTEST_ASSERT_GE(vnodeInit(), 0);
// Create and open a vnode
SVnode *pVnode = vnodeOpen("vnode1", NULL);
ASSERT_NE(pVnode, nullptr);
tb_uid_t suid = 1638166374163;
{
// Create a super table
STSchema *pSchema = createBasicSchema();
STSchema *pTagSchema = createBasicTagSchema();
char tbname[128] = "st";
SArray * pMsgs = (SArray *)taosArrayInit(1, sizeof(SRpcMsg *));
SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
int zs = vnodeBuildReq(NULL, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE;
pMsg->contLen = zs;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
void *pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
META_CLEAR_TB_CFG(&vCreateSTbReq);
taosArrayPush(pMsgs, &(pMsg));
vnodeProcessWMsgs(pVnode, pMsgs);
free(pMsg);
taosArrayDestroy(pMsgs);
tdFreeSchema(pSchema);
tdFreeSchema(pTagSchema);
}
{
// Create some child tables
int ntables = 100000;
int batch = 10;
for (int i = 0; i < ntables / batch; i++) {
SArray *pMsgs = (SArray *)taosArrayInit(batch, sizeof(SRpcMsg *));
for (int j = 0; j < batch; j++) {
SKVRow pTag = createBasicTag();
char tbname[128];
sprintf(tbname, "tb%d", i * batch + j);
SVnodeReq vCreateCTbReq = VNODE_INIT_CREATE_CTB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
int tz = vnodeBuildReq(NULL, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE;
pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
void *pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
META_CLEAR_TB_CFG(&vCreateCTbReq);
free(pTag);
taosArrayPush(pMsgs, &(pMsg));
}
vnodeProcessWMsgs(pVnode, pMsgs);
for (int j = 0; j < batch; j++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayPop(pMsgs);
free(pMsg);
}
taosArrayDestroy(pMsgs);
// std::cout << "the " << i << "th batch is created" << std::endl;
}
}
// Close the vnode
vnodeClose(pVnode);
vnodeClear();
}
TEST(vnodeApiTest, DISABLED_vnode_process_create_table) {
STSchema * pSchema = NULL;
STSchema * pTagSchema = NULL;
char stname[15];
SVCreateTableReq pReq = META_INIT_STB_CFG(stname, UINT32_MAX, UINT32_MAX, 0, pSchema, pTagSchema);
int k = 10;
META_CLEAR_TB_CFG(pReq);
}
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_OPTIONS_H_
#define _TD_META_OPTIONS_H_
#ifndef _TD_META_CFG_H_
#define _TD_META_CFG_H_
#include "meta.h"
......@@ -22,13 +22,13 @@
extern "C" {
#endif
extern const SMetaOptions defaultMetaOptions;
extern const SMetaCfg defaultMetaOptions;
int metaValidateOptions(const SMetaOptions *);
void metaOptionsCopy(SMetaOptions *pDest, const SMetaOptions *pSrc);
int metaValidateOptions(const SMetaCfg *);
void metaOptionsCopy(SMetaCfg *pDest, const SMetaCfg *pSrc);
#ifdef __cplusplus
}
#endif
#endif /*_TD_META_OPTIONS_H_*/
\ No newline at end of file
#endif /*_TD_META_CFG_H_*/
\ No newline at end of file
......@@ -34,7 +34,7 @@ typedef struct {
int metaOpenDB(SMeta *pMeta);
void metaCloseDB(SMeta *pMeta);
int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions);
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions);
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid);
#ifdef __cplusplus
......
......@@ -20,10 +20,10 @@
#include "meta.h"
#include "metaCache.h"
#include "metaCfg.h"
#include "metaDB.h"
#include "metaIdx.h"
#include "metaOptions.h"
#include "metaTbOptions.h"
#include "metaTbCfg.h"
#include "metaTbTag.h"
#include "metaTbUid.h"
......@@ -33,7 +33,7 @@ extern "C" {
struct SMeta {
char* path;
SMetaOptions options;
SMetaCfg options;
meta_db_t* pDB;
meta_index_t* pIdx;
meta_cache_t* pCache;
......
......@@ -28,7 +28,7 @@ typedef rocksdb_t meta_index_t;
int metaOpenIdx(SMeta *pMeta);
void metaCloseIdx(SMeta *pMeta);
int metaSaveTableToIdx(SMeta *pMeta, const STbOptions *pTbOptions);
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbOptions);
int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid);
#ifdef __cplusplus
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_TABLE_OPTIONS_H_
#define _TD_META_TABLE_OPTIONS_H_
#ifndef _TD_META_TABLE_CFG_H_
#define _TD_META_TABLE_CFG_H_
#include "meta.h"
......@@ -22,11 +22,15 @@
extern "C" {
#endif
int metaValidateTbOptions(SMeta *pMeta, const STbOptions *);
size_t metaEncodeTbObjFromTbOptions(const STbOptions *, void *pBuf, size_t bsize);
#define META_SUPER_TABLE 0
#define META_CHILD_TABLE 1
#define META_NORMAL_TABLE 2
int metaValidateTbOptions(SMeta *pMeta, const STbCfg *);
size_t metaEncodeTbObjFromTbOptions(const STbCfg *, void *pBuf, size_t bsize);
#ifdef __cplusplus
}
#endif
#endif /*_TD_META_TABLE_OPTIONS_H_*/
\ No newline at end of file
#endif /*_TD_META_TABLE_CFG_H_*/
\ No newline at end of file
......@@ -18,8 +18,8 @@
int metaOpenCache(SMeta *pMeta) {
// TODO
if (pMeta->options.lruCacheSize) {
pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruCacheSize);
if (pMeta->options.lruSize) {
pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
if (pMeta->pCache == NULL) {
// TODO: handle error
return -1;
......
......@@ -15,20 +15,20 @@
#include "metaDef.h"
const SMetaOptions defaultMetaOptions = {.lruCacheSize = 0};
const SMetaCfg defaultMetaOptions = {.lruSize = 0};
/* ------------------------ EXPOSED METHODS ------------------------ */
void metaOptionsInit(SMetaOptions *pMetaOptions) { metaOptionsCopy(pMetaOptions, &defaultMetaOptions); }
void metaOptionsInit(SMetaCfg *pMetaOptions) { metaOptionsCopy(pMetaOptions, &defaultMetaOptions); }
void metaOptionsClear(SMetaOptions *pMetaOptions) {
void metaOptionsClear(SMetaCfg *pMetaOptions) {
// TODO
}
int metaValidateOptions(const SMetaOptions *pMetaOptions) {
int metaValidateOptions(const SMetaCfg *pMetaOptions) {
// TODO
return 0;
}
void metaOptionsCopy(SMetaOptions *pDest, const SMetaOptions *pSrc) { memcpy(pDest, pSrc, sizeof(*pSrc)); }
void metaOptionsCopy(SMetaCfg *pDest, const SMetaCfg *pSrc) { memcpy(pDest, pSrc, sizeof(*pSrc)); }
/* ------------------------ STATIC METHODS ------------------------ */
\ No newline at end of file
......@@ -92,7 +92,7 @@ void metaCloseDB(SMeta *pMeta) {
}
}
int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) {
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
tb_uid_t uid;
char * err = NULL;
size_t size;
......@@ -102,13 +102,14 @@ int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) {
// Generate a uid for child and normal table
if (pTbOptions->type == META_SUPER_TABLE) {
uid = pTbOptions->stbOptions.uid;
uid = pTbOptions->stbCfg.suid;
} else {
uid = metaGenerateUid(pMeta);
}
// Save tbname -> uid to tbnameDB
rocksdb_put(pMeta->pDB->nameDb, wopt, pTbOptions->name, strlen(pTbOptions->name), (char *)(&uid), sizeof(uid), &err);
rocksdb_writeoptions_disable_WAL(wopt, 1);
// Save uid -> tb_obj to tbDB
size = metaEncodeTbObjFromTbOptions(pTbOptions, pBuf, 1024);
......@@ -117,22 +118,22 @@ int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) {
switch (pTbOptions->type) {
case META_NORMAL_TABLE:
// save schemaDB
metaSaveSchemaDB(pMeta, uid, pTbOptions->ntbOptions.pSchame);
metaSaveSchemaDB(pMeta, uid, pTbOptions->ntbCfg.pSchema);
break;
case META_SUPER_TABLE:
// save schemaDB
metaSaveSchemaDB(pMeta, uid, pTbOptions->stbOptions.pSchema);
metaSaveSchemaDB(pMeta, uid, pTbOptions->stbCfg.pSchema);
// save mapDB (really need?)
rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&uid), sizeof(uid), "", 0, &err);
break;
case META_CHILD_TABLE:
// save tagDB
rocksdb_put(pMeta->pDB->tagDb, wopt, (char *)(&uid), sizeof(uid), pTbOptions->ctbOptions.tags,
kvRowLen(pTbOptions->ctbOptions.tags), &err);
rocksdb_put(pMeta->pDB->tagDb, wopt, (char *)(&uid), sizeof(uid), pTbOptions->ctbCfg.pTag,
kvRowLen(pTbOptions->ctbCfg.pTag), &err);
// save mapDB
metaSaveMapDB(pMeta, pTbOptions->ctbOptions.suid, uid);
metaSaveMapDB(pMeta, pTbOptions->ctbCfg.suid, uid);
break;
default:
ASSERT(0);
......@@ -157,6 +158,7 @@ static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) {
char * err = NULL;
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
rocksdb_writeoptions_disable_WAL(wopt, 1);
metaGetSchemaDBKey(key, uid, schemaVersion(pSchema));
vsize = tdEncodeSchema((void **)(&ppBuf), pSchema);
......@@ -190,10 +192,12 @@ static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid) {
memcpy(POINTER_SHIFT(nval, vlen), (void *)(&uid), sizeof(uid));
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
rocksdb_writeoptions_disable_WAL(wopt, 1);
rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&suid), sizeof(suid), nval, vlen + sizeof(uid), &err);
rocksdb_writeoptions_destroy(wopt);
free(nval);
return 0;
}
\ No newline at end of file
......@@ -47,7 +47,12 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
}
}
int metaSaveTableToIdx(SMeta *pMeta, const STbOptions *pTbOptions) {
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbOptions) {
// TODO
return 0;
}
int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
// TODO
return 0;
}
\ No newline at end of file
......@@ -15,17 +15,14 @@
#include "tcoding.h"
#include "meta.h"
#include "metaDB.h"
#include "metaDef.h"
#include "metaOptions.h"
static SMeta *metaNew(const char *path, const SMetaOptions *pMetaOptions);
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions);
static void metaFree(SMeta *pMeta);
static int metaOpenImpl(SMeta *pMeta);
static void metaCloseImpl(SMeta *pMeta);
SMeta *metaOpen(const char *path, const SMetaOptions *pMetaOptions) {
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaOptions) {
SMeta *pMeta = NULL;
// Set default options
......@@ -68,7 +65,7 @@ void metaClose(SMeta *pMeta) {
void metaRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */
static SMeta *metaNew(const char *path, const SMetaOptions *pMetaOptions) {
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) {
SMeta *pMeta;
size_t psize = strlen(path);
......
......@@ -15,21 +15,21 @@
#include "metaDef.h"
int metaCreateTable(SMeta *pMeta, const STbOptions *pTbOptions) {
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
// Validate the tbOptions
if (metaValidateTbOptions(pMeta, pTbOptions) < 0) {
if (metaValidateTbOptions(pMeta, pTbCfg) < 0) {
// TODO: handle error
return -1;
}
// TODO: add atomicity
if (metaSaveTableToDB(pMeta, pTbOptions) < 0) {
if (metaSaveTableToDB(pMeta, pTbCfg) < 0) {
// TODO: handle error
return -1;
}
if (metaSaveTableToIdx(pMeta, pTbOptions) < 0) {
if (metaSaveTableToIdx(pMeta, pTbCfg) < 0) {
// TODO: handle error
return -1;
}
......
......@@ -16,12 +16,12 @@
#include "metaDef.h"
#include "tcoding.h"
int metaValidateTbOptions(SMeta *pMeta, const STbOptions *pTbOptions) {
int metaValidateTbOptions(SMeta *pMeta, const STbCfg *pTbOptions) {
// TODO
return 0;
}
size_t metaEncodeTbObjFromTbOptions(const STbOptions *pTbOptions, void *pBuf, size_t bsize) {
size_t metaEncodeTbObjFromTbOptions(const STbCfg *pTbOptions, void *pBuf, size_t bsize) {
void **ppBuf = &pBuf;
int tlen = 0;
......@@ -31,12 +31,12 @@ size_t metaEncodeTbObjFromTbOptions(const STbOptions *pTbOptions, void *pBuf, si
switch (pTbOptions->type) {
case META_SUPER_TABLE:
tlen += taosEncodeFixedU64(ppBuf, pTbOptions->stbOptions.uid);
tlen += tdEncodeSchema(ppBuf, pTbOptions->stbOptions.pTagSchema);
tlen += taosEncodeFixedU64(ppBuf, pTbOptions->stbCfg.suid);
tlen += tdEncodeSchema(ppBuf, pTbOptions->stbCfg.pTagSchema);
// TODO: encode schema version array
break;
case META_CHILD_TABLE:
tlen += taosEncodeFixedU64(ppBuf, pTbOptions->ctbOptions.suid);
tlen += taosEncodeFixedU64(ppBuf, pTbOptions->ctbCfg.suid);
break;
case META_NORMAL_TABLE:
// TODO: encode schema version array
......@@ -46,4 +46,13 @@ size_t metaEncodeTbObjFromTbOptions(const STbOptions *pTbOptions, void *pBuf, si
}
return tlen;
}
int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg) {
// TODO
return 0;
}
void *metaDecodeTbCfg(void *pBuf, STbCfg **pTbCfg) {
// TODO
}
\ No newline at end of file
......@@ -40,17 +40,17 @@ void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) {
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) {
STQ* pTq = malloc(sizeof(STQ));
if(pTq == NULL) {
//TODO: memory error
return NULL;
}
strcpy(pTq->path, path);
pTq->path = strdup(path);
pTq->tqConfig = tqConfig;
pTq->tqLogReader = tqLogReader;
pTq->tqMemRef.pAlloctorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create();
// pTq->tqMemRef.pAlloctorFactory = allocFac;
// pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if(pTq->tqMemRef.pAllocator == NULL) {
//TODO
}
......
......@@ -28,7 +28,7 @@ extern "C" {
struct STsdb {
char * path;
STsdbOptions options;
STsdbCfg options;
SMemAllocatorFactory *pmaf;
};
......
......@@ -20,10 +20,10 @@
extern "C" {
#endif
extern const STsdbOptions defautlTsdbOptions;
extern const STsdbCfg defautlTsdbOptions;
int tsdbValidateOptions(const STsdbOptions *);
void tsdbOptionsCopy(STsdbOptions *pDest, const STsdbOptions *pSrc);
int tsdbValidateOptions(const STsdbCfg *);
void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc);
#ifdef __cplusplus
}
......
......@@ -15,12 +15,12 @@
#include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, const STsdbOptions *pTsdbOptions);
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions);
static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, const STsdbOptions *pTsdbOptions) {
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbOptions) {
STsdb *pTsdb = NULL;
// Set default TSDB Options
......@@ -62,7 +62,7 @@ void tsdbClose(STsdb *pTsdb) {
void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, const STsdbOptions *pTsdbOptions) {
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) {
STsdb *pTsdb = NULL;
pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
......
......@@ -15,20 +15,20 @@
#include "tsdbDef.h"
const STsdbOptions defautlTsdbOptions = {.lruCacheSize = 0};
const STsdbCfg defautlTsdbOptions = {.lruCacheSize = 0};
int tsdbOptionsInit(STsdbOptions *pTsdbOptions) {
int tsdbOptionsInit(STsdbCfg *pTsdbOptions) {
// TODO
return 0;
}
void tsdbOptionsClear(STsdbOptions *pTsdbOptions) {
void tsdbOptionsClear(STsdbCfg *pTsdbOptions) {
// TODO
}
int tsdbValidateOptions(const STsdbOptions *pTsdbOptions) {
int tsdbValidateOptions(const STsdbCfg *pTsdbOptions) {
// TODO
return 0;
}
void tsdbOptionsCopy(STsdbOptions *pDest, const STsdbOptions *pSrc) { memcpy(pDest, pSrc, sizeof(STsdbOptions)); }
\ No newline at end of file
void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc) { memcpy(pDest, pSrc, sizeof(STsdbCfg)); }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册