diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h index c3bb336e817eb4eade3c97c0f5ea9e17553e88c5..9dc6d97eb6acce4077cc2e7b93c05a49839e68f7 100644 --- a/src/util/inc/tbuffer.h +++ b/src/util/inc/tbuffer.h @@ -24,45 +24,78 @@ /* SBuffer can be used to read or write a buffer, but cannot be used for both -read & write at a same time. -Read example: - SBuffer rbuf; - if (tbufBeginOperation(&rbuf) != 0) { - // handling errors - } - tbufInitRead(&rbuf, data, 1024); - int32_t a = tbufReadInt32(&rbuf); - // other read functions - -Write example: - SBuffer wbuf; - if (tbufBeginOperation(&wbuf) != 0) { - // handling errors - } - tbufInitWrite(&wbuf, 1024); - tbufWriteInt32(&wbuf, 10); - // other write functions - size_t size = tbufGetSize(&wbuf); - char* data = tbufGetBuffer(&wbuf, true); - tbufUninitWrite(&wbuf); +read & write at a same time. Below is an example: + +int main(int argc, char** argv) { + //--------------------- write ------------------------ + SBuffer wbuf; + int32_t code = tbufBeginWrite(&wbuf); + if (code != 0) { + // handle errors + return 0; + } + + // reserve 1024 bytes for the buffer to improve performance + tbufEnsureCapacity(&wbuf, 1024); + + // write 5 integers to the buffer + for (int i = 0; i < 5; i++) { + tbufWriteInt32(&wbuf, i); + } + + // write a string to the buffer + tbufWriteString(&wbuf, "this is a string.\n"); + + // acquire the result and close the write buffer + size_t size = tbufTell(&wbuf); + char* data = tbufGetData(&wbuf, true); + tbufClose(&wbuf, true); + + + //------------------------ read ----------------------- + SBuffer rbuf; + code = tbufBeginRead(&rbuf, data, size); + if (code != 0) { + printf("you will see this message after print out 5 integers and a string.\n"); + tbufClose(&rbuf, false); + return 0; + } + + // read & print out 5 integers + for (int i = 0; i < 5; i++) { + printf("%d\n", tbufReadInt32(&rbuf)); + } + + // read & print out a string + printf(tbufReadString(&rbuf, NULL)); + + // try read another integer, this result in an error as there no this integer + tbufReadInt32(&rbuf); + + printf("you should not see this message.\n"); + tbufClose(&rbuf, false); + + return 0; +} */ typedef struct { jmp_buf jb; - char* buf; + char* data; size_t pos; size_t size; } SBuffer; // common functions can be used in both read & write -#define tbufBeginOperation(buf) setjmp((buf)->jb) +#define tbufThrowError(buf, code) longjmp((buf)->jb, (code)) size_t tbufTell(SBuffer* buf); size_t tbufSeekTo(SBuffer* buf, size_t pos); size_t tbufSkip(SBuffer* buf, size_t size); +void tbufClose(SBuffer* buf, bool keepData); // basic read functions -void tbufInitRead(SBuffer* buf, void* data, size_t size); +#define tbufBeginRead(buf, data, len) (((buf)->data = (char*)data), ((buf)->pos = 0), ((buf)->size = ((data) == NULL) ? 0 : (len)), setjmp((buf)->jb)) char* tbufRead(SBuffer* buf, size_t size); void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); const char* tbufReadString(SBuffer* buf, size_t* len); @@ -70,10 +103,9 @@ size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); // basic write functions -void tbufInitWrite(SBuffer* buf, size_t size); +#define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb)) void tbufEnsureCapacity(SBuffer* buf, size_t size); -char* tbufGetResult(SBuffer* buf, bool takeOver); -void tbufUninitWrite(SBuffer* buf); +char* tbufGetData(SBuffer* buf, bool takeOver); void tbufWrite(SBuffer* buf, const void* data, size_t size); void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); @@ -81,24 +113,24 @@ void tbufWriteString(SBuffer* buf, const char* str); // read & write function for primitive types -#ifndef TBUFFER_DEFINE_OPERATION -#define TBUFFER_DEFINE_OPERATION(type, name) \ +#ifndef TBUFFER_DEFINE_FUNCTION +#define TBUFFER_DEFINE_FUNCTION(type, name) \ type tbufRead##name(SBuffer* buf); \ void tbufWrite##name(SBuffer* buf, type data); \ void tbufWrite##name##At(SBuffer* buf, size_t pos, type data); #endif -TBUFFER_DEFINE_OPERATION( bool, Bool ) -TBUFFER_DEFINE_OPERATION( char, Char ) -TBUFFER_DEFINE_OPERATION( int8_t, Int8 ) -TBUFFER_DEFINE_OPERATION( uint8_t, Unt8 ) -TBUFFER_DEFINE_OPERATION( int16_t, Int16 ) -TBUFFER_DEFINE_OPERATION( uint16_t, Uint16 ) -TBUFFER_DEFINE_OPERATION( int32_t, Int32 ) -TBUFFER_DEFINE_OPERATION( uint32_t, Uint32 ) -TBUFFER_DEFINE_OPERATION( int64_t, Int64 ) -TBUFFER_DEFINE_OPERATION( uint64_t, Uint64 ) -TBUFFER_DEFINE_OPERATION( float, Float ) -TBUFFER_DEFINE_OPERATION( double, Double ) +TBUFFER_DEFINE_FUNCTION( bool, Bool ) +TBUFFER_DEFINE_FUNCTION( char, Char ) +TBUFFER_DEFINE_FUNCTION( int8_t, Int8 ) +TBUFFER_DEFINE_FUNCTION( uint8_t, Unt8 ) +TBUFFER_DEFINE_FUNCTION( int16_t, Int16 ) +TBUFFER_DEFINE_FUNCTION( uint16_t, Uint16 ) +TBUFFER_DEFINE_FUNCTION( int32_t, Int32 ) +TBUFFER_DEFINE_FUNCTION( uint32_t, Uint32 ) +TBUFFER_DEFINE_FUNCTION( int64_t, Int64 ) +TBUFFER_DEFINE_FUNCTION( uint64_t, Uint64 ) +TBUFFER_DEFINE_FUNCTION( float, Float ) +TBUFFER_DEFINE_FUNCTION( double, Double ) #endif \ No newline at end of file diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c index 0b8cfbbae9bf5c9e62d895dfe6005f6a79516537..ac7d22078d42eea60fde157ea364dee146203a2f 100644 --- a/src/util/src/tbuffer.c +++ b/src/util/src/tbuffer.c @@ -13,7 +13,11 @@ * along with this program. If not, see . */ -#define TBUFFER_DEFINE_OPERATION(type, name) \ +#include +#include +#include + +#define TBUFFER_DEFINE_FUNCTION(type, name) \ type tbufRead##name(SBuffer* buf) { \ type ret; \ tbufReadToBuffer(buf, &ret, sizeof(type)); \ @@ -38,7 +42,8 @@ size_t tbufTell(SBuffer* buf) { size_t tbufSeekTo(SBuffer* buf, size_t pos) { if (pos > buf->size) { - longjmp(buf->jb, 1); + // TODO: update error code, other tbufThrowError need to be changed too + tbufThrowError(buf, 1); } size_t old = buf->pos; buf->pos = pos; @@ -49,18 +54,20 @@ size_t tbufSkip(SBuffer* buf, size_t size) { return tbufSeekTo(buf, buf->pos + size); } -//////////////////////////////////////////////////////////////////////////////// -// read functions - -void tbufInitRead(SBuffer* buf, void* data, size_t size) { - buf->buf = (char*)data; +void tbufClose(SBuffer* buf, bool keepData) { + if (!keepData) { + free(buf->data); + } + buf->data = NULL; buf->pos = 0; - // empty buffer is not an error, but read an empty buffer is - buf->size = (data == NULL) ? 0 : size; + buf->size = 0; } +//////////////////////////////////////////////////////////////////////////////// +// read functions + char* tbufRead(SBuffer* buf, size_t size) { - char* ret = buf->buf + buf->pos; + char* ret = buf->data + buf->pos; tbufSkip(buf, size); return ret; } @@ -72,8 +79,8 @@ void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) { } const char* tbufReadString(SBuffer* buf, size_t* len) { - uint16_t l = tbufReadUint16(); - char* ret = buf->buf + buf->pos; + uint16_t l = tbufReadUint16(buf); + char* ret = buf->data + buf->pos; tbufSkip(buf, l + 1); ret[l] = 0; // ensure the string end with '\0' if (len != NULL) { @@ -83,9 +90,12 @@ const char* tbufReadString(SBuffer* buf, size_t* len) { } size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) { + assert(dst != NULL); size_t len; const char* str = tbufReadString(buf, &len); - if (len >= size) len = size - 1; + if (len >= size) { + len = size - 1; + } memcpy(dst, str, len); dst[len] = 0; return len; @@ -98,57 +108,53 @@ size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) { void tbufEnsureCapacity(SBuffer* buf, size_t size) { size += buf->pos; if (size > buf->size) { - char* nbuf = NULL; size_t nsize = size + buf->size; - nbuf = realloc(buf->buf, nsize); - if (nbuf == NULL) { - longjmp(buf->jb, 2); + char* data = realloc(buf->data, nsize); + if (data == NULL) { + tbufThrowError(buf, 2); } - buf->buf = nbuf; + buf->data = data; buf->size = nsize; } } -void tbufInitWrite(SBuffer* buf, size_t size) { - buf->buf = NULL; - buf->pos = 0; - buf->size = 0; - tbufEnsureCapacity(buf, size); -} - -char* tbufGetResult(SBuffer* buf, bool takeOver) { - char* ret = buf->buf; +char* tbufGetData(SBuffer* buf, bool takeOver) { + char* ret = buf->data; if (takeOver) { buf->pos = 0; buf->size = 0; - buf->buf = NULL; + buf->data = NULL; } return ret; } -void tbufUninitWrite(SBuffer* buf) { - free(buf->buf); +void tbufEndWrite(SBuffer* buf) { + free(buf->data); + buf->data = NULL; + buf->pos = 0; + buf->size = 0; } void tbufWrite(SBuffer* buf, const void* data, size_t size) { - tbufEnsureCapacity(size); - memcpy(buf->buf + buf->pos, data, size); + assert(data != NULL); + tbufEnsureCapacity(buf, size); + memcpy(buf->data + buf->pos, data, size); buf->pos += size; } void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) { + assert(data != NULL); // this function can only be called to fill the gap on previous writes, // so 'pos + size <= buf->pos' must be true - if (pos + size > buf->pos) { - longjmp(buf->jb, 3); - } - memcpy(buf->buf + pos, data, size); + assert(pos + size <= buf->pos); + memcpy(buf->data + pos, data, size); } void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) { - if (len > 0xffff) { - longjmp(buf->jb , 4); - } + // maximum string length is 65535, if longer string is required + // this function and the corresponding read function need to be + // revised. + assert(len <= 0xffff); tbufWriteUint16(buf, (uint16_t)len); tbufWrite(buf, str, len + 1); } diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 5a7b605cb81c20bb8f530e30a0dc6a34688a92ec..92270a886f4cae4363e8c774bfa2622c3989b23b 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -1,4 +1,7 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8) -PROJECT(TDengine) +cmake_minimum_required(VERSION 2.8) -ADD_SUBDIRECTORY(detail) \ No newline at end of file +project(tsdb) + +add_subdirectory(common) + +add_subdirectory(tsdb) \ No newline at end of file diff --git a/src/vnode/common/CMakeLists.txt b/src/vnode/common/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..38803c2421ffc97bbecf018d02b962228af9eb62 --- /dev/null +++ b/src/vnode/common/CMakeLists.txt @@ -0,0 +1,8 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST) + +list(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/src/vnodePeer.c) + +message(STATUS "Common source file ${SOURCE_LIST}") + +add_library(common ${SOURCE_LIST}) +target_include_directories(common PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc") diff --git a/src/vnode/common/src/schema.c b/src/vnode/common/src/schema.c index cf0664c7cc0dee5542cc958921e01c60acb568ff..79b41533d23a0ef5ae8ace9da67e9416d5f604b0 100644 --- a/src/vnode/common/src/schema.c +++ b/src/vnode/common/src/schema.c @@ -51,7 +51,7 @@ SISchema tdConvertSchemaToInline(SSchema *pSchema) { char *pName = TD_ISCHEMA_COL_NAMES(pISchema); for (int32_t i = 0; i < totalCols; i++) { SColumn *pCol = TD_SCHEMA_COLUMN_AT(TD_ISCHEMA_SCHEMA(pISchema), i); - char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i), i); + char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i)); TD_COLUMN_NAME(pCol) = pName; diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/vnode/tsdb/CMakeLists.txt index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..4a88fbd7d65dfe6a327d7b5ad0348fb12dc079e4 100644 --- a/src/vnode/tsdb/CMakeLists.txt +++ b/src/vnode/tsdb/CMakeLists.txt @@ -0,0 +1,9 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST) + +message(STATUS "tsdb source files: ${SOURCE_LIST}") + +add_library(tsdb STATIC ${SOURCE_LIST}) + +target_link_libraries(tsdb common) + +target_include_directories(tsdb PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc") \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 2117c951ca8b74d1d850d92fc2cd418146c32a76..b04f0148f06b27d8dd3b2e6d3e35519828eb2e08 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -8,7 +8,7 @@ #include #include -#include "cache.h" +// #include "cache.h" #include "schema.h" #define TSDB_VERSION_MAJOR 1 @@ -18,6 +18,21 @@ typedef void tsdb_repo_t; // use void to hide implementation details from ou typedef int32_t table_id_t; // table ID type in this repository typedef int16_t tsdb_id_t; // TSDB repository ID +// Submit message +typedef struct { + int32_t numOfTables; + char data[]; +} SSubmitMsg; + +// Submit message for one table +typedef struct { + table_id_t tableId; // table ID to insert + int32_t sversion; // data schema version + int32_t numOfRows; // number of rows data + int64_t uid; // table UID to insert + char data[]; +} SSubmitBlock; + // Retention policy. typedef struct { // TODO: Need a more fancy description @@ -54,7 +69,7 @@ typedef struct { SDataShardPolicy dataShardPolicy; SBlockRowsPolicy blockRowsPolicy; SRetentionPolicy retentionPlicy; // retention configuration - SCachePool * cachePool; // the cache pool the repository to use + void * cachePool; // the cache pool the repository to use } STSDBCfg; // the TSDB repository info @@ -205,6 +220,9 @@ typedef struct STimeWindow { int64_t ekey; } STimeWindow; +typedef struct { +} SColumnFilterInfo; + // query condition to build vnode iterator typedef struct STSDBQueryCond { STimeWindow twindow; @@ -237,6 +255,10 @@ typedef struct STableIDList { int32_t num; } STableIDList; +typedef struct { + +} SFields; + /** * Get the data block iterator, starting from position according to the query condition * @param pRepo the TSDB repository to query on diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index b0993c1e093be67e1267c3bc897f25cf40d50c01..049cdc0847d8da27f46fbf77d30d9d24fc3bc67a 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -3,7 +3,7 @@ #include -#include "cache.h" +// #include "cache.h" #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16*1024*1024 /* 16M */ @@ -13,22 +13,21 @@ typedef struct { int32_t numOfRows // numOfRows } STableCacheInfo; -typedef struct { - char *pData; - STableCacheInfo *pTableInfo; - SCacheBlock *prev; - SCacheBlock *next; +typedef struct _tsdb_cache_block { + char * pData; + STableCacheInfo * pTableInfo; + struct _tsdb_cache_block *prev; + struct _tsdb_cache_block *next; } STSDBCacheBlock; // Use a doublely linked list to implement this typedef struct STSDBCache { // Number of blocks the cache is allocated - int32_t numOfBlocks; + int32_t numOfBlocks; STSDBCacheBlock *cacheList; - void * current; + void * current; } SCacheHandle; - // ---- Operation on STSDBCacheBlock #define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) #define TSDB_CACHE_AVAIL_SPACE(pBlock) ((char *)((pBlock)->pTableInfo) - ((pBlock)->pData)) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 91abea076f4551257ec83620833c00045a084fb2..02eb0c78813a7e89c326bfc6b99b3351b805ab67 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -1,7 +1,8 @@ #if !defined(_TD_TSDB_FILE_H_) #define _TD_TSDB_FILE_H_ -#include "tstring.h" +#include +// #include "tstring.h" typedef int32_t file_id_t; @@ -24,7 +25,7 @@ typedef struct { } SFileInfo; typedef struct { - tstring_t fname; + char * fname; SFileInfo fInfo; } SFILE; diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index 28bafa1dc8a8b426edca2e4a0397d23676d1b1ec..588394459e151f4fc03c0c5923dcf05082daeba5 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -4,7 +4,7 @@ #include -#include "taosdef.h" +// #include "taosdef.h" // Initially, there are 4 tables #define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4 @@ -30,7 +30,7 @@ typedef struct STable { // For TSDB_SUPER_TABLE, it is the schema including tags // For TSDB_NTABLE, it is only the schema, not including tags // For TSDB_STABLE, it is NULL - SVSchema *pSchema; + SSchema *pSchema; // Tag value for this table // For TSDB_SUPER_TABLE and TSDB_NTABLE, it is NULL @@ -75,7 +75,7 @@ typedef struct { #define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData) #define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex) -SVSchema *tsdbGetTableSchema(STable *pTable); +SSchema *tsdbGetTableSchema(STable *pTable); // ---- Operation on SMetaHandle #define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables) diff --git a/src/vnode/tsdb/src/tsdb.c b/src/vnode/tsdb/src/tsdb.c index d0087945240e8ac83befb1a5fb45a7a4c3aa41ed..7e13e3183ad8ab33e9f975356359fd7d247f069c 100644 --- a/src/vnode/tsdb/src/tsdb.c +++ b/src/vnode/tsdb/src/tsdb.c @@ -2,14 +2,15 @@ #include #include -#include "taosdef.h" +// #include "taosdef.h" // #include "disk.h" +#include "tsdb.h" #include "tsdbCache.h" #include "tsdbMeta.h" typedef struct STSDBRepo { // TSDB configuration - STSDBcfg *pCfg; + STSDBCfg *pCfg; // The meter meta handle of this TSDB repository SMetaHandle *pMetaHandle; @@ -18,12 +19,12 @@ typedef struct STSDBRepo { SCacheHandle *pCacheHandle; // Disk tier handle for multi-tier storage - SDiskTier *pDiskTier; + void *pDiskTier; // File Store void *pFileStore; - pthread_mutext_t tsdbMutex; + pthread_mutex_t tsdbMutex; } STSDBRepo; diff --git a/src/vnode/tsdb/src/tsdbFileStore.c b/src/vnode/tsdb/src/tsdbFileStore.c index f6cc959f8f6dd24fb9ef4d65c23e5d8debf97929..a47f2eb1e48c85b47ab4e5adce262074d03942d6 100644 --- a/src/vnode/tsdb/src/tsdbFileStore.c +++ b/src/vnode/tsdb/src/tsdbFileStore.c @@ -1,6 +1,6 @@ #include "tsdbFile.h" char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type){ - char *suffix = tsdbFileSuffix[type]; + // char *suffix = tsdbFileSuffix[type]; // TODO } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 5bc82dfa103056e58abe3746460503d85b8e824c..9fad7b126931f40ccae7a3c3bf001e933c4d9115 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -1,6 +1,7 @@ #include -#include "taosdef.h" +// #include "taosdef.h" +#include "tsdb.h" #include "tsdbMeta.h" SMetaHandle *tsdbCreateMetaHandle(int32_t numOfTables) { @@ -11,7 +12,7 @@ SMetaHandle *tsdbCreateMetaHandle(int32_t numOfTables) { pMetahandle->numOfTables = 0; pMetahandle->numOfSuperTables = 0; - pMetahandle->pTables = calloc(sizeof(STable *) * numOfTables); + pMetahandle->pTables = calloc(sizeof(STable *), numOfTables); if (pMetahandle->pTables == NULL) { free(pMetahandle); return NULL;