diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h
index cc13ab2eca3d2ee6281257626b69b490784569e5..4589a0573aca0e9a8595cb9942f6ddfc9b6bc00b 100644
--- a/src/common/inc/dataformat.h
+++ b/src/common/inc/dataformat.h
@@ -51,44 +51,53 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes);
// ----------------- TSDB SCHEMA DEFINITION
typedef struct {
- int32_t numOfCols;
- int32_t padding; // TODO: replace the padding for useful variable
+ int numOfCols; // Number of columns appended
+ int totalCols; // Total columns allocated
STColumn columns[];
} STSchema;
#define schemaNCols(s) ((s)->numOfCols)
+#define schemaTCols(s) ((s)->totalCols)
#define schemaColAt(s, i) ((s)->columns + i)
STSchema *tdNewSchema(int32_t nCols);
+int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes);
STSchema *tdDupSchema(STSchema *pSchema);
-void tdFreeSchema(STSchema *pSchema);
-void tdUpdateSchema(STSchema *pSchema);
+void tdFreeSchema(STSchema *pSchema);
+void tdUpdateSchema(STSchema *pSchema);
// ----------------- Data row structure
/* A data row, the format is like below:
- * +---------+---------------------------------+
- * | int32_t | |
- * +---------+---------------------------------+
- * | len | row |
- * +---------+---------------------------------+
+ * +----------+---------+---------------------------------+---------------------------------+
+ * | int32_t | int32_t | | |
+ * +----------+---------+---------------------------------+---------------------------------+
+ * | len | flen | First part | Second part |
+ * +----------+---------+---------------------------------+---------------------------------+
+ * plen: first part length
* len: the length including sizeof(row) + sizeof(len)
* row: actual row data encoding
*/
typedef void *SDataRow;
+#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
+
#define dataRowLen(r) (*(int32_t *)(r))
-#define dataRowTuple(r) ((char *)(r) + sizeof(int32_t))
+#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
+#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
+#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
#define dataRowIdx(r, i) ((char *)(r) + i)
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
+#define dataRowAt(r, idx) ((char *)(r) + (idx))
-SDataRow tdNewDataRow(int32_t bytes);
-// SDataRow tdNewDdataFromSchema(SSchema *pSchema);
+void tdInitDataRow(SDataRow row, STSchema *pSchema);
+int tdMaxRowBytesFromSchema(STSchema *pSchema);
+SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema);
+SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row);
-// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset);
-void tdDataRowCpy(void *dst, SDataRow row);
-void tdDataRowReset(SDataRow row);
+int tdAppendColVal(SDataRow row, void *value, STColumn *pCol);
+void tdDataRowReset(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row);
/* Data rows definition, the format of it is like below:
diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c
index 064cb3ff29404b43e6beed99696a5dac33abdd17..58530c5e3d54bdefe7a38b9e6c9c13112f0a2602 100644
--- a/src/common/src/dataformat.c
+++ b/src/common/src/dataformat.c
@@ -14,6 +14,8 @@
*/
#include "dataformat.h"
+static int tdFLenFromSchema(STSchema *pSchema);
+
/**
* Create a new STColumn object
* ASSUMPTIONS: VALID PARAMETERS
@@ -89,13 +91,40 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) {
STSchema *tdNewSchema(int32_t nCols) {
int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
- STSchema *pSchema = (STSchema *)calloc(1, size);
+ STSchema *pSchema = (STSchema *)malloc(size);
if (pSchema == NULL) return NULL;
- pSchema->numOfCols = nCols;
+ pSchema->numOfCols = 0;
+ pSchema->totalCols = nCols;
return pSchema;
}
+/**
+ * Append a column to the schema
+ */
+int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes) {
+ if (pSchema->numOfCols >= pSchema->totalCols) return -1;
+ if (!isValidDataType(type, 0)) return -1;
+
+ STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
+ colSetType(pCol, type);
+ colSetColId(pCol, colId);
+ colSetOffset(pCol, -1);
+ switch (type) {
+ case TSDB_DATA_TYPE_BINARY:
+ case TSDB_DATA_TYPE_NCHAR:
+ colSetBytes(pCol, bytes);
+ break;
+ default:
+ colSetBytes(pCol, TYPE_BYTES[type]);
+ break;
+ }
+
+ pSchema->numOfCols++;
+
+ return 0;
+}
+
/**
* Duplicate the schema and return a new object
*/
@@ -130,6 +159,14 @@ void tdUpdateSchema(STSchema *pSchema) {
}
}
+/**
+ * Initialize a data row
+ */
+void tdInitDataRow(SDataRow row, STSchema *pSchema) {
+ dataRowSetFLen(row, TD_DATA_ROW_HEAD_SIZE);
+ dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + tdFLenFromSchema(pSchema));
+}
+
/**
* Create a data row with maximum row length bytes.
*
@@ -140,21 +177,37 @@ void tdUpdateSchema(STSchema *pSchema) {
* @return SDataRow object for success
* NULL for failure
*/
-SDataRow tdNewDataRow(int32_t bytes) {
+SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema) {
int32_t size = sizeof(int32_t) + bytes;
SDataRow row = malloc(size);
if (row == NULL) return NULL;
- dataRowSetLen(row, sizeof(int32_t));
+ tdInitDataRow(row, pSchema);
return row;
}
-// SDataRow tdNewDdataFromSchema(SSchema *pSchema) {
-// int32_t bytes = tdMaxRowDataBytes(pSchema);
-// return tdNewDataRow(bytes);
-// }
+/**
+ * Get maximum bytes a data row from a schema
+ * ASSUMPTIONS: VALID PARAMETER
+ */
+int tdMaxRowBytesFromSchema(STSchema *pSchema) {
+ // TODO
+ int bytes = TD_DATA_ROW_HEAD_SIZE;
+ for (int i = 0; i < schemaNCols(pSchema); i++) {
+ STColumn *pCol = schemaColAt(pSchema, i);
+ bytes += TYPE_BYTES[pCol->type];
+
+ if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
+ bytes += pCol->bytes;
+ }
+ }
+
+ return bytes;
+}
+
+SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return tdNewDataRow(tdMaxRowBytesFromSchema(pSchema), pSchema); }
/**
* Free the SDataRow object
@@ -164,62 +217,37 @@ void tdFreeDataRow(SDataRow row) {
}
/**
- * Append a column value to a SDataRow object.
- * NOTE: THE APPLICATION SHOULD MAKE SURE VALID PARAMETERS. THE FUNCTION ASSUMES
- * THE ROW OBJECT HAS ENOUGH SPACE TO HOLD THE VALUE.
- *
- * @param row the row to append value to
- * @param value value pointer to append
- * @param pSchema schema
- * @param colIdx column index
- *
- * @return 0 for success and -1 for failure
- */
-// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset) {
-// int32_t offset;
-
-// switch (pCol->type) {
-// case TD_DATATYPE_BOOL:
-// case TD_DATATYPE_TINYINT:
-// case TD_DATATYPE_SMALLINT:
-// case TD_DATATYPE_INT:
-// case TD_DATATYPE_BIGINT:
-// case TD_DATATYPE_FLOAT:
-// case TD_DATATYPE_DOUBLE:
-// case TD_DATATYPE_TIMESTAMP:
-// memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]);
-// if (dataRowLen(row) < suffixOffset + sizeof(int32_t))
-// dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]);
-// break;
-// case TD_DATATYPE_VARCHAR:
-// offset = dataRowLen(row) > suffixOffset ? dataRowLen(row) : suffixOffset;
-// memcpy(dataRowIdx(row, pCol->offset+sizeof(int32_t)), (void *)(&offset), sizeof(offset));
-// case TD_DATATYPE_NCHAR:
-// case TD_DATATYPE_BINARY:
-// break;
-// default:
-// return -1;
-// }
-
-// return 0;
-// }
-
-/**
- * Copy a data row to a destination
- * ASSUMPTIONS: dst has enough room for a copy of row
+ * Append a column value to the data row
*/
-void tdDataRowCpy(void *dst, SDataRow row) { memcpy(dst, row, dataRowLen(row)); }
-void tdDataRowReset(SDataRow row) { dataRowSetLen(row, sizeof(int32_t)); }
+int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) {
+ switch (colType(pCol))
+ {
+ case TSDB_DATA_TYPE_BINARY:
+ case TSDB_DATA_TYPE_NCHAR:
+ *(int32_t *)dataRowAt(row, dataRowFLen(row)) = dataRowLen(row);
+ dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
+ memcpy((void *)dataRowAt(row, dataRowLen(row)), value, strlen(value));
+ dataRowLen(row) += strlen(value);
+ break;
+ default:
+ memcpy(dataRowAt(row, dataRowFLen(row)), value, TYPE_BYTES[colType(pCol)]);
+ dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
+ break;
+ }
+}
+
+void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); }
+
SDataRow tdDataRowDup(SDataRow row) {
- SDataRow trow = tdNewDataRow(dataRowLen(row));
+ SDataRow trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
- return row;
+ return trow;
}
void tdDataRowsAppendRow(SDataRows rows, SDataRow row) {
- tdDataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
+ dataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row));
}
@@ -250,4 +278,17 @@ SDataRow tdDataRowsNext(SDataRowsIter *pIter) {
}
return row;
+}
+
+/**
+ * Return the first part length of a data row for a schema
+ */
+static int tdFLenFromSchema(STSchema *pSchema) {
+ int ret = 0;
+ for (int i = 0; i < schemaNCols(pSchema); i++) {
+ STColumn *pCol = schemaColAt(pSchema, i);
+ ret += TYPE_BYTES[pCol->type];
+ }
+
+ return ret;
}
\ No newline at end of file
diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c
index 14b4d593fb9f2a614c804db3dcd8537b87aae6a7..2f4aa6ab765c71fa841ff5246e3e88ae37fd054b 100644
--- a/src/common/src/ttypes.c
+++ b/src/common/src/ttypes.c
@@ -26,9 +26,9 @@ const int32_t TYPE_BYTES[11] = {
sizeof(int64_t), // TSDB_DATA_TYPE_BIGINT
sizeof(float), // TSDB_DATA_TYPE_FLOAT
sizeof(double), // TSDB_DATA_TYPE_DOUBLE
- -1, // TSDB_DATA_TYPE_BINARY
+ sizeof(int32_t), // TSDB_DATA_TYPE_BINARY
sizeof(TSKEY), // TSDB_DATA_TYPE_TIMESTAMP
- -1 // TSDB_DATA_TYPE_NCHAR
+ sizeof(int32_t) // TSDB_DATA_TYPE_NCHAR
};
tDataTypeDescriptor tDataTypeDesc[11] = {
diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c
index 63e4a290aa5b4615559fb798ba5d4d3b97937f8d..79d2de7739142bb5ea3069dac245a759875d4e06 100644
--- a/src/dnode/src/dnodeMgmt.c
+++ b/src/dnode/src/dnodeMgmt.c
@@ -91,9 +91,9 @@ int32_t dnodeInitMgmt() {
SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
dnodeDropVnode(pVnode);
- dnodeCreateVnode(&cfg);
- SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
- dnodeCleanupVnodes();
+ // dnodeCreateVnode(&cfg);
+ // SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
+ // dnodeCleanupVnodes();
dnodeOpenVnodes();
dnodeCleanupVnodes();
diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/vnode/tsdb/CMakeLists.txt
index 1317e32b51cd88bee64ca8e56ae15d4da9c50c46..8a7c7a1a5197e3e47ed7e36cdb2ebcdcef2d6b49 100644
--- a/src/vnode/tsdb/CMakeLists.txt
+++ b/src/vnode/tsdb/CMakeLists.txt
@@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(tsdb common tutil)
# Someone has no gtest directory, so comment it
- #ADD_SUBDIRECTORY(tests)
+ ADD_SUBDIRECTORY(tests)
ENDIF ()
diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h
index f97ea3dc5e43338924b97df53be342bcfbc466a1..267b462b91dbd07ac512b02681fb3efc6fdeffab 100644
--- a/src/vnode/tsdb/inc/tsdb.h
+++ b/src/vnode/tsdb/inc/tsdb.h
@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#if !defined(_TD_TSDB_H_)
+#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include
@@ -30,43 +30,104 @@ extern "C" {
#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0
+#define TSDB_INVALID_SUPER_TABLE_ID -1
+
+// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
+enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO };
+typedef enum {
+ TSDB_SUPER_TABLE, // super table
+ TSDB_NTABLE, // table not created from super table
+ TSDB_STABLE // table created from super table
+} TSDB_TABLE_TYPE;
+
+typedef struct {
+ int8_t precision;
+ int32_t vgId;
+ int32_t tsdbId;
+ int32_t maxTables; // maximum number of tables this repository can have
+ int32_t daysPerFile; // day per file sharding policy
+ int32_t minRowsPerFileBlock; // minimum rows per file block
+ int32_t maxRowsPerFileBlock; // maximum rows per file block
+ int32_t keep; // day of data to keep
+ int64_t maxCacheSize; // maximum cache size this TSDB can use
+} STsdbCfg;
+
+void tsdbSetDefaultCfg(STsdbCfg *pCfg);
+STsdbCfg *tsdbCreateDefaultCfg();
+void tsdbFreeCfg(STsdbCfg *pCfg);
+
+// --------- TSDB REPOSITORY DEFINITION
typedef void tsdb_repo_t; // use void to hide implementation details from outside
+tsdb_repo_t * tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
+int32_t tsdbDropRepo(tsdb_repo_t *repo);
+tsdb_repo_t * tsdbOpenRepo(char *tsdbDir);
+int32_t tsdbCloseRepo(tsdb_repo_t *repo);
+int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
+
+// --------- TSDB TABLE DEFINITION
typedef struct {
int64_t uid; // the unique table ID
int32_t tid; // the table ID in the repository.
} STableId;
-// Submit message for this TSDB
+// --------- TSDB TABLE configuration
typedef struct {
- int32_t numOfTables;
- int32_t compressed;
- char data[];
-} SSubmitMsg;
+ TSDB_TABLE_TYPE type;
+ STableId tableId;
+ int64_t superUid;
+ STSchema * schema;
+ STSchema * tagSchema;
+ SDataRow tagValues;
+} STableCfg;
+
+int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid);
+int tsdbTableSetSuperUid(STableCfg *config, int64_t uid);
+int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
+int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
+int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
+void tsdbClearTableCfg(STableCfg *config);
+
+int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
+int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
+int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
// Submit message for one table
typedef struct {
STableId tableId;
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
- int32_t len; // message length
+ int32_t len; // data part length, not including the SSubmitBlk head
char data[];
-} SSubmitBlock;
+} SSubmitBlk;
-enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO };
+typedef struct {
+ int32_t totalLen;
+ int32_t len;
+ SDataRow row;
+} SSubmitBlkIter;
-// the TSDB repository configuration
+int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
+SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
+
+// Submit message for this TSDB
typedef struct {
- int8_t precision;
- int32_t vgId;
- int32_t tsdbId;
- int32_t maxTables; // maximum number of tables this repository can have
- int32_t daysPerFile; // day per file sharding policy
- int32_t minRowsPerFileBlock; // minimum rows per file block
- int32_t maxRowsPerFileBlock; // maximum rows per file block
- int32_t keep; // day of data to keep
- int64_t maxCacheSize; // maximum cache size this TSDB can use
-} STsdbCfg;
+ int32_t length;
+ int32_t compressed;
+ SSubmitBlk blocks[];
+} SSubmitMsg;
+
+#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
+
+// SSubmitMsg Iterator
+typedef struct {
+ int32_t totalLen;
+ int32_t len;
+ SSubmitBlk *pBlock;
+} SSubmitMsgIter;
+
+int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
+SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter);
// the TSDB repository info
typedef struct STsdbRepoInfo {
@@ -76,22 +137,7 @@ typedef struct STsdbRepoInfo {
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add
} STsdbRepoInfo;
-
-// the meter configuration
-typedef struct {
- STableId tableId;
-
- int64_t stableUid;
- int64_t createdTime;
-
- int32_t numOfCols; // number of columns. For table form super table, not includes the tag schema
- STSchema *schema; // If numOfCols == schema_->numOfCols, it is a normal table, stableName = NULL
- // If numOfCols < schema->numOfCols, it is a table created from super table
- // assert(numOfCols <= schema->numOfCols);
-
- SDataRow tagValues; // NULL if it is normal table
- // otherwise, it contains the tag values.
-} STableCfg;
+STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo);
// the meter information report structure
typedef struct {
@@ -100,70 +146,7 @@ typedef struct {
int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes
} STableInfo;
-
-/**
- * Create a configuration for TSDB default
- * @return a pointer to a configuration. the configuration must call tsdbFreeCfg to free memory after usage
- */
-STsdbCfg *tsdbCreateDefaultCfg();
-
-/**
- * Free
- */
-void tsdbFreeCfg(STsdbCfg *pCfg);
-
-/**
- * Create a new TSDB repository
- * @param rootDir the TSDB repository root directory
- * @param pCfg the TSDB repository configuration, upper layer to free the pointer
- *
- * @return a TSDB repository handle on success, NULL for failure and the error number is set
- */
-tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
-
-/**
- * Close and free all resources taken by the repository
- * @param repo the TSDB repository handle. The interface will free the handle too, so upper
- * layer do NOT need to free the repo handle again.
- *
- * @return 0 for success, -1 for failure and the error number is set
- */
-int32_t tsdbDropRepo(tsdb_repo_t *repo);
-
-/**
- * Open an existing TSDB storage repository
- * @param tsdbDir the existing TSDB root directory
- *
- * @return a TSDB repository handle on success, NULL for failure and the error number is set
- */
-tsdb_repo_t *tsdbOpenRepo(char *tsdbDir);
-
-/**
- * Close a TSDB repository. Only free memory resources, and keep the files.
- * @param repo the opened TSDB repository handle. The interface will free the handle too, so upper
- * layer do NOT need to free the repo handle again.
- *
- * @return 0 for success, -1 for failure and the error number is set
- */
-int32_t tsdbCloseRepo(tsdb_repo_t *repo);
-
-/**
- * Change the configuration of a repository
- * @param pCfg the repository configuration, the upper layer should free the pointer
- *
- * @return 0 for success, -1 for failure and the error number is set
- */
-int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
-
-/**
- * Get the TSDB repository information, including some statistics
- * @param pRepo the TSDB repository handle
- * @param error the error number to set when failure occurs
- *
- * @return a info struct handle on success, NULL for failure and the error number is set. The upper
- * layers should free the info handle themselves or memory leak will occur
- */
-STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo);
+STableInfo * tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
// -- For table manipulation
@@ -174,8 +157,6 @@ STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo);
*
* @return 0 for success, -1 for failure and the error number is set
*/
-int32_t tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
-int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
/**
* Drop a table in a repository and free all the resources it takes
@@ -185,7 +166,6 @@ int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
*
* @return 0 for success, -1 for failure and the error number is set
*/
-int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
/**
* Get the information of a table in the repository
@@ -195,7 +175,6 @@ int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
*
* @return a table information handle for success, NULL for failure and the error number is set
*/
-STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
// -- FOR INSERT DATA
/**
diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h
index 8a78a6b19e4ff48945a90273b371717df8d285d5..1821505eae295f2a82d472625ea2f876be81a59c 100644
--- a/src/vnode/tsdb/inc/tsdbCache.h
+++ b/src/vnode/tsdb/inc/tsdbCache.h
@@ -53,7 +53,7 @@ typedef struct STSDBCache {
#define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next)
#define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev)
-STsdbCache *tsdbCreateCache(int32_t numOfBlocks);
+STsdbCache *tsdbInitCache(int64_t maxSize);
int32_t tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes);
diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h
index dbcec496511576fabb5da4b47cf8b607f4733263..7b3e19d0b981d8f1c4d78297b087c55e61984309 100644
--- a/src/vnode/tsdb/inc/tsdbFile.h
+++ b/src/vnode/tsdb/inc/tsdbFile.h
@@ -16,7 +16,8 @@
#define _TD_TSDB_FILE_H_
#include
-// #include "tstring.h"
+
+#include "taosdef.h"
#ifdef __cplusplus
extern "C" {
@@ -34,7 +35,8 @@ typedef enum {
extern const char *tsdbFileSuffix[];
typedef struct {
- int64_t fileSize;
+ int64_t size;
+ int64_t tombSize;
} SFileInfo;
typedef struct {
diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h
index efab26e1dbde5c5463a0b3c2e5e02ab4a2894c9a..be7c7d0406770ba143d847be5f3bb4c411d3260c 100644
--- a/src/vnode/tsdb/inc/tsdbMeta.h
+++ b/src/vnode/tsdb/inc/tsdbMeta.h
@@ -20,6 +20,7 @@
#include "tsdb.h"
#include "dataformat.h"
#include "tskiplist.h"
+#include "tsdbMetaFile.h"
#ifdef __cplusplus
extern "C" {
@@ -30,62 +31,47 @@ extern "C" {
// Initially, there are 4 tables
#define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4
-typedef enum {
- TSDB_SUPER_TABLE, // super table
- TSDB_NTABLE, // table not created from super table
- TSDB_STABLE // table created from super table
-} TSDB_TABLE_TYPE;
-
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
+// ---------- TSDB TABLE DEFINITION
typedef struct STable {
- STableId tableId;
TSDB_TABLE_TYPE type;
-
- int64_t createdTime;
-
- // super table UID -1 for normal table
- int32_t stableUid;
-
- int32_t numOfCols;
-
- // Schema for this table
- // For TSDB_SUPER_TABLE, it is the schema including tags
- // For TSDB_NTABLE, it is only the schema, not including tags
- // For TSDB_STABLE, it is NULL
- STSchema *pSchema;
-
- // Tag value for this table
- // For TSDB_SUPER_TABLE and TSDB_NTABLE, it is NULL
- // For TSDB_STABLE, it is the tag value string
- SDataRow pTagVal;
-
- // Object content;
- // For TSDB_SUPER_TABLE, it is the index of tables created from it
- // For TSDB_STABLE and TSDB_NTABLE, it is the cache data
+ STableId tableId;
+ int32_t superUid; // Super table UID
+ STSchema * schema;
+ STSchema * tagSchema;
+ SDataRow tagVal;
union {
- void *pData;
- void *pIndex;
+ void *pData; // For TSDB_NTABLE and TSDB_STABLE, it is the skiplist for cache data
+ void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
} content;
+ void * eventHandler; // TODO
+ void * streamHandler; // TODO
+ struct STable *next; // TODO: remove the next
+} STable;
- // A handle to deal with event
- void *eventHandler;
+void * tsdbEncodeTable(STable *pTable, int *contLen);
+STable *tsdbDecodeTable(void *cont, int contLen);
+void * tsdbFreeEncode(void *cont);
- // A handle to deal with stream
- void *streamHandler;
+// ---------- TSDB META HANDLE DEFINITION
+typedef struct {
+ int32_t maxTables; // Max number of tables
- struct STable *next;
+ int32_t nTables; // Tables created
-} STable;
+ STable **tables; // table array
-typedef struct {
- int32_t maxTables;
- int32_t nTables;
- STable **tables; // array of normal tables
- STable * stables; // linked list of super tables // TODO use container to implement this
- void * tableMap; // hash map of uid ==> STable *
+ STable *superList; // super table list TODO: change it to list container
+
+ void *map; // table map of (uid ===> table)
+
+ SMetaFile *mfh; // meta file handle
} STsdbMeta;
+STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables);
+int32_t tsdbFreeMeta(STsdbMeta *pMeta);
+
// ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
#define TSDB_TABLE_UID(pTable) ((pTable)->uid)
@@ -97,21 +83,12 @@ typedef struct {
#define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData)
#define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex)
-STSchema *tsdbGetTableSchema(STable *pTable);
-
// ---- Operation on SMetaHandle
#define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables)
#define TSDB_NUM_OF_SUPER_TABLES(pHandle) ((pHandle)->numOfSuperTables)
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
-// Create a new meta handle with configuration
-STsdbMeta *tsdbCreateMeta(int32_t maxTables);
-int32_t tsdbFreeMeta(STsdbMeta *pMeta);
-
-// Recover the meta handle from the file
-STsdbMeta *tsdbOpenMeta(char *tsdbDir);
-
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
diff --git a/src/vnode/tsdb/inc/tsdbMetaFile.h b/src/vnode/tsdb/inc/tsdbMetaFile.h
new file mode 100644
index 0000000000000000000000000000000000000000..9fad703842889e61fea73fe7686ce3f71add65b0
--- /dev/null
+++ b/src/vnode/tsdb/inc/tsdbMetaFile.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TSDB_META_FILE_
+#define _TSDB_META_FILE_
+
+#include
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define TSDB_META_FILE_NAME "META"
+#define TSDB_META_HASH_FRACTION 1.1
+
+typedef struct {
+ int fd; // File descriptor
+ int nDel; // number of deletions
+ int nRecord; // Number of records
+ int64_t size; // Total file size
+ void * map; // Map from uid ==> position
+} SMetaFile;
+
+SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables);
+int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
+int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid);
+int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
+void tsdbCloseMetaFile(SMetaFile *mfh);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // _TSDB_META_FILE_
\ No newline at end of file
diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c
index dacb36025370a27267bdc72557408c3f8db93974..165c561b5d4ef56cbe6cc8ec10b5c5bb3cb59470 100644
--- a/src/vnode/tsdb/src/tsdbCache.c
+++ b/src/vnode/tsdb/src/tsdbCache.c
@@ -16,7 +16,7 @@
#include "tsdbCache.h"
-STsdbCache *tsdbCreateCache(int32_t numOfBlocks) {
+STsdbCache *tsdbInitCache(int64_t maxSize) {
STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache));
if (pCacheHandle == NULL) {
// TODO : deal with the error
diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c
index 6009d160e3b4ae80aed19024baf6dc2639929c57..b977a51b51481294871413390641c90559b9cdff 100644
--- a/src/vnode/tsdb/src/tsdbFile.c
+++ b/src/vnode/tsdb/src/tsdbFile.c
@@ -18,6 +18,52 @@
#include "tsdbFile.h"
+typedef struct {
+ int64_t offset;
+} SCompHeader;
+
+typedef struct {
+ int64_t uid;
+ int64_t last : 1;
+ int64_t numOfBlocks : 63;
+ int32_t delimiter;
+} SCompInfo;
+
+typedef struct {
+ TSKEY keyFirst;
+ TSKEY keyLast;
+ int32_t numOfBlocks;
+ int32_t offset;
+} SCompIdx;
+
+typedef struct {
+ TSKEY keyFirst;
+ TSKEY keyLast;
+ int64_t offset;
+ int32_t len;
+ int32_t sversion;
+} SCompBlock;
+
+typedef struct {
+ int64_t uid;
+} SBlock;
+
+typedef struct {
+ int16_t colId;
+ int16_t bytes;
+ int32_t nNullPoints;
+ int32_t type:8;
+ int32_t offset:24;
+ int32_t len;
+ // fields for pre-aggregate
+ // TODO: pre-aggregation should be seperated
+ int64_t sum;
+ int64_t max;
+ int64_t min;
+ int16_t maxIdx;
+ int16_t minIdx;
+} SField;
+
const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD
".data", // TSDB_FILE_TYPE_DATA
diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c
index 935bf6281c9a3f646cf21a53a3fd2492965f88aa..0f70875c635feb05f2bac8e0b7261cc74cf04b07 100644
--- a/src/vnode/tsdb/src/tsdbMain.c
+++ b/src/vnode/tsdb/src/tsdbMain.c
@@ -42,6 +42,9 @@
#define TSDB_MIN_CACHE_SIZE (4 * 1024 * 1024) // 4M
#define TSDB_MAX_CACHE_SIZE (1024 * 1024 * 1024) // 1G
+#define TSDB_CFG_FILE_NAME "CONFIG"
+#define TSDB_DATA_DIR_NAME "data"
+
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
typedef struct _tsdb_repo {
@@ -75,16 +78,18 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir);
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg);
-static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock);
+static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
#define TSDB_IS_REPO_ACTIVE(pRepo) ((pRepo)->state == TSDB_REPO_STATE_ACTIVE)
#define TSDB_IS_REPO_CLOSED(pRepo) ((pRepo)->state == TSDB_REPO_STATE_CLOSED)
-STsdbCfg *tsdbCreateDefaultCfg() {
- STsdbCfg *pCfg = (STsdbCfg *)malloc(sizeof(STsdbCfg));
- if (pCfg == NULL) return NULL;
+/**
+ * Set the default TSDB configuration
+ */
+void tsdbSetDefaultCfg(STsdbCfg *pCfg) {
+ if (pCfg == NULL) return;
pCfg->precision = -1;
pCfg->tsdbId = 0;
@@ -94,6 +99,18 @@ STsdbCfg *tsdbCreateDefaultCfg() {
pCfg->maxRowsPerFileBlock = -1;
pCfg->keep = -1;
pCfg->maxCacheSize = -1;
+}
+
+/**
+ * Create a configuration for TSDB default
+ * @return a pointer to a configuration. the configuration object
+ * must call tsdbFreeCfg to free memory after usage
+ */
+STsdbCfg *tsdbCreateDefaultCfg() {
+ STsdbCfg *pCfg = (STsdbCfg *)malloc(sizeof(STsdbCfg));
+ if (pCfg == NULL) return NULL;
+
+ tsdbSetDefaultCfg(pCfg);
return pCfg;
}
@@ -102,7 +119,15 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
if (pCfg != NULL) free(pCfg);
}
-tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) {
+/**
+ * Create a new TSDB repository
+ * @param rootDir the TSDB repository root directory
+ * @param pCfg the TSDB repository configuration, upper layer need to free the pointer
+ * @param limiter the limitation tracker will implement in the future, make it void now
+ *
+ * @return a TSDB repository handle on success, NULL for failure
+ */
+tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) {
if (rootDir == NULL) return NULL;
if (access(rootDir, F_OK | R_OK | W_OK) == -1) return NULL;
@@ -120,35 +145,44 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) {
pRepo->config = *pCfg;
pRepo->limiter = limiter;
- pRepo->tsdbMeta = tsdbCreateMeta(pCfg->maxTables);
- if (pRepo->tsdbMeta == NULL) {
+ // Create the environment files and directories
+ if (tsdbSetRepoEnv(pRepo) < 0) {
free(pRepo->rootDir);
free(pRepo);
return NULL;
}
- pRepo->tsdbCache = tsdbCreateCache(5);
- if (pRepo->tsdbCache == NULL) {
+ // Initialize meta
+ STsdbMeta *pMeta = tsdbInitMeta(rootDir, pCfg->maxTables);
+ if (pMeta == NULL) {
free(pRepo->rootDir);
- tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo);
return NULL;
}
+ pRepo->tsdbMeta = pMeta;
- // Create the Meta data file and data directory
- if (tsdbSetRepoEnv(pRepo) < 0) {
+ // Initialize cache
+ STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize);
+ if (pCache == NULL) {
free(pRepo->rootDir);
tsdbFreeMeta(pRepo->tsdbMeta);
- tsdbFreeCache(pRepo->tsdbCache);
free(pRepo);
return NULL;
}
+ pRepo->tsdbCache = pCache;
pRepo->state = TSDB_REPO_STATE_ACTIVE;
return (tsdb_repo_t *)pRepo;
}
+/**
+ * Close and free all resources taken by the repository
+ * @param repo the TSDB repository handle. The interface will free the handle too, so upper
+ * layer do NOT need to free the repo handle again.
+ *
+ * @return 0 for success, -1 for failure and the error number is set
+ */
int32_t tsdbDropRepo(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
@@ -169,6 +203,12 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) {
return 0;
}
+/**
+ * Open an existing TSDB storage repository
+ * @param tsdbDir the existing TSDB root directory
+ *
+ * @return a TSDB repository handle on success, NULL for failure and the error number is set
+ */
tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) {
return NULL;
@@ -191,7 +231,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return NULL;
}
- pRepo->tsdbCache = tsdbCreateCache(5);
+ pRepo->tsdbCache = tsdbInitCache(5);
if (pRepo->tsdbCache == NULL) {
// TODO: deal with error
return NULL;
@@ -208,6 +248,13 @@ static int32_t tsdbFlushCache(STsdbRepo *pRepo) {
return 0;
}
+/**
+ * Close a TSDB repository. Only free memory resources, and keep the files.
+ * @param repo the opened TSDB repository handle. The interface will free the handle too, so upper
+ * layer do NOT need to free the repo handle again.
+ *
+ * @return 0 for success, -1 for failure and the error number is set
+ */
int32_t tsdbCloseRepo(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return 0;
@@ -223,6 +270,12 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) {
return 0;
}
+/**
+ * Change the configuration of a repository
+ * @param pCfg the repository configuration, the upper layer should free the pointer
+ *
+ * @return 0 for success, -1 for failure and the error number is set
+ */
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
@@ -231,22 +284,30 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
return 0;
}
+/**
+ * Get the TSDB repository information, including some statistics
+ * @param pRepo the TSDB repository handle
+ * @param error the error number to set when failure occurs
+ *
+ * @return a info struct handle on success, NULL for failure and the error number is set. The upper
+ * layers should free the info handle themselves or memory leak will occur
+ */
STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo) {
// TODO
return NULL;
}
-int32_t tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg) {
+int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
return tsdbCreateTableImpl(pRepo->tsdbMeta, pCfg);
}
-int32_t tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
+int tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
// TODO
return 0;
}
-int32_t tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
+int tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
// TODO
if (repo == NULL) return -1;
STsdbRepo *pRepo = (STsdbRepo *)repo;
@@ -261,18 +322,150 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) {
// TODO: need to return the number of data inserted
int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
- SSubmitBlock *pBlock = (SSubmitBlock *)pMsg->data;
+ SSubmitMsgIter msgIter;
- for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message
+ tsdbInitSubmitMsgIter(pMsg, &msgIter);
+ SSubmitBlk *pBlock;
+ while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if (tsdbInsertDataToTable(repo, pBlock) < 0) {
return -1;
}
- pBlock = (SSubmitBlock *)(((char *)pBlock) + sizeof(SSubmitBlock) + pBlock->len);
}
return 0;
}
+/**
+ * Initialize a table configuration
+ */
+int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid) {
+ if (config == NULL) return -1;
+ if (type != TSDB_NTABLE && type != TSDB_STABLE) return -1;
+
+ memset((void *)config, 0, sizeof(STableCfg));
+
+ config->type = type;
+ config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
+ config->tableId.uid = uid;
+ config->tableId.tid = tid;
+ return 0;
+}
+
+/**
+ * Set the super table UID of the created table
+ */
+int tsdbTableSetSuperUid(STableCfg *config, int64_t uid) {
+ if (config->type != TSDB_STABLE) return -1;
+ if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1;
+
+ config->superUid = uid;
+ return 0;
+}
+
+/**
+ * Set the table schema in the configuration
+ * @param config the configuration to set
+ * @param pSchema the schema to set
+ * @param dup use the schema directly or duplicate one for use
+ *
+ * @return 0 for success and -1 for failure
+ */
+int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup) {
+ if (dup) {
+ config->schema = tdDupSchema(pSchema);
+ } else {
+ config->schema = pSchema;
+ }
+ return 0;
+}
+
+/**
+ * Set the table schema in the configuration
+ * @param config the configuration to set
+ * @param pSchema the schema to set
+ * @param dup use the schema directly or duplicate one for use
+ *
+ * @return 0 for success and -1 for failure
+ */
+int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
+ if (config->type != TSDB_STABLE) return -1;
+
+ if (dup) {
+ config->tagSchema = tdDupSchema(pSchema);
+ } else {
+ config->tagSchema = pSchema;
+ }
+ return 0;
+}
+
+int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) {
+ if (config->type != TSDB_STABLE) return -1;
+
+ if (dup) {
+ config->tagValues = tdDataRowDup(row);
+ } else {
+ config->tagValues = row;
+ }
+
+ return 0;
+}
+
+void tsdbClearTableCfg(STableCfg *config) {
+ if (config->schema) tdFreeSchema(config->schema);
+ if (config->tagSchema) tdFreeSchema(config->tagSchema);
+ if (config->tagValues) tdFreeDataRow(config->tagValues);
+}
+
+int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
+ if (pBlock->len <= 0) return -1;
+ pIter->totalLen = pBlock->len;
+ pIter->len = 0;
+ pIter->row = (SDataRow)(pBlock->data);
+ return 0;
+}
+
+SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
+ SDataRow row = pIter->row;
+ if (row == NULL) return NULL;
+
+ pIter->len += dataRowLen(row);
+ if (pIter->len >= pIter->totalLen) {
+ pIter->row = NULL;
+ } else {
+ pIter->row = (char *)row + dataRowLen(row);
+ }
+
+ return row;
+}
+
+int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
+ if (pMsg == NULL || pIter == NULL) return -1;
+
+ pIter->totalLen = pMsg->length;
+ pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE;
+ if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
+ pIter->pBlock = NULL;
+ } else {
+ pIter->pBlock = pMsg->blocks;
+ }
+
+ return 0;
+}
+
+SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
+ SSubmitBlk *pBlock = pIter->pBlock;
+ if (pBlock == NULL) return NULL;
+
+ pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len;
+ if (pIter->len >= pIter->totalLen) {
+ pIter->pBlock = NULL;
+ } else {
+ pIter->pBlock = (char *)pBlock + pBlock->len + sizeof(SSubmitBlk);
+ }
+
+ return pBlock;
+}
+
// Check the configuration and set default options
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check precision
@@ -285,7 +478,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check tsdbId
if (pCfg->tsdbId < 0) return -1;
- // Check MaxTables
+ // Check maxTables
if (pCfg->maxTables == -1) {
pCfg->maxTables = TSDB_DEFAULT_TABLES;
} else {
@@ -333,10 +526,18 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
return 0;
}
-static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
- char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META);
+static int32_t tsdbGetCfgFname(STsdbRepo *pRepo, char *fname) {
+ if (pRepo == NULL) return -1;
+ sprintf(fname, "%s/%s", pRepo->rootDir, TSDB_CFG_FILE_NAME);
+ return 0;
+}
+
+static int32_t tsdbSaveConfig(STsdbRepo *pRepo) {
+ char fname[128] = "\0"; // TODO: get rid of the literal 128
+
+ if (tsdbGetCfgFname(pRepo, fname) < 0) return -1;
- int fd = open(metaFname, O_WRONLY | O_CREAT);
+ int fd = open(fname, O_WRONLY | O_CREAT, 0755);
if (fd < 0) {
return -1;
}
@@ -345,19 +546,45 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
return -1;
}
- // Create the data file
- char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2);
- if (dirName == NULL) {
+ close(fd);
+ return 0;
+}
+
+static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg) {
+ char fname[128] = "\0";
+
+ if (tsdbGetCfgFname(pRepo, fname) < 0) return -1;
+
+ int fd = open(fname, O_RDONLY);
+ if (fd < 0) {
return -1;
}
- sprintf(dirName, "%s/%s", pRepo->rootDir, "tsdb");
- if (mkdir(dirName, 0755) < 0) {
- free(dirName);
+ if (read(fd, (void *)pCfg, sizeof(STsdbCfg)) < sizeof(STsdbCfg)) {
+ close(fd);
return -1;
}
- free(dirName);
+ close(fd);
+
+ return 0;
+}
+
+static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname) {
+ if (pRepo == NULL || pRepo->rootDir == NULL) return -1;
+ sprintf(fname, "%s/%s", pRepo->rootDir, TSDB_DATA_DIR_NAME);
+ return 0;
+}
+
+static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
+ if (tsdbSaveConfig(pRepo) < 0) return -1;
+
+ char dirName[128] = "\0";
+ if (tsdbGetDataDirName(pRepo, dirName) < 0) return -1;
+
+ if (mkdir(dirName, 0755) < 0) {
+ return -1;
+ }
return 0;
}
@@ -417,7 +644,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
}
pNode->level = level;
- tdDataRowCpy(SL_GET_NODE_DATA(pNode), row);
+ dataRowCpy(SL_GET_NODE_DATA(pNode), row);
// Insert the skiplist node into the data
tsdbInsertRowToTableImpl(pNode, pTable);
@@ -425,23 +652,19 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return 0;
}
-static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) {
+static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId);
- if (pTable == NULL) {
- return -1;
- }
+ if (pTable == NULL) return -1;
- SDataRows rows = pBlock->data;
- SDataRowsIter rDataIter, *pIter;
- pIter = &rDataIter;
+ SSubmitBlkIter blkIter;
SDataRow row;
- tdInitSDataRowsIter(rows, pIter);
- while ((row = tdDataRowsNext(pIter)) != NULL) {
+ tsdbInitSubmitBlkIter(pBlock, &blkIter);
+ while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
- // TODO: deal with the error here
+ return -1;
}
}
diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c
index 573921192b790fa083684277f85da3559838a30d..5c5c5c50f0b83b7ec41c81443a9323f0bbd8727f 100644
--- a/src/vnode/tsdb/src/tsdbMeta.c
+++ b/src/vnode/tsdb/src/tsdbMeta.c
@@ -9,6 +9,7 @@
#include "tsdbCache.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
+#define TSDB_META_FILE_NAME "META"
static int tsdbFreeTable(STable *pTable);
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
@@ -16,24 +17,85 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
+static int tsdbEstimateTableEncodeSize(STable *pTable);
-STsdbMeta *tsdbCreateMeta(int32_t maxTables) {
- STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
- if (pMeta == NULL) {
- return NULL;
+/**
+ * Encode a TSDB table object as a binary content
+ * ASSUMPTIONS: VALID PARAMETERS
+ *
+ * @param pTable table object to encode
+ * @param contLen the encoded binary content length
+ *
+ * @return binary content for success
+ * NULL fro failure
+ */
+void *tsdbEncodeTable(STable *pTable, int *contLen) {
+ if (pTable == NULL) return NULL;
+
+ *contLen = tsdbEstimateTableEncodeSize(pTable);
+ if (*contLen < 0) return NULL;
+
+ void *ret = malloc(*contLen);
+ if (ret == NULL) return NULL;
+
+ // TODO: encode the object to the memory
+ {}
+
+ return ret;
+}
+
+/**
+ * Decode from an encoded binary
+ * ASSUMPTIONS: valid parameters
+ *
+ * @param cont binary object
+ * @param contLen binary length
+ *
+ * @return TSDB table object for success
+ * NULL for failure
+ */
+STable *tsdbDecodeTable(void *cont, int contLen) {
+ STable *pTable = (STable *)calloc(1, sizeof(STable));
+ if (pTable == NULL) return NULL;
+
+ {
+ // TODO recover from the binary content
}
+ return pTable;
+}
+
+void *tsdbFreeEncode(void *cont) {
+ if (cont != NULL) free(cont);
+}
+
+/**
+ * Initialize the meta handle
+ * ASSUMPTIONS: VALID PARAMETER
+ */
+STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) {
+ STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
+ if (pMeta == NULL) return NULL;
+
pMeta->maxTables = maxTables;
pMeta->nTables = 0;
- pMeta->stables = NULL;
+ pMeta->superList = NULL;
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
if (pMeta->tables == NULL) {
free(pMeta);
return NULL;
}
- pMeta->tableMap = taosHashInit(maxTables + maxTables / 10, taosGetDefaultHashFunction, false);
- if (pMeta->tableMap == NULL) {
+ pMeta->map = taosHashInit(maxTables * TSDB_META_HASH_FRACTION, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
+ if (pMeta->map == NULL) {
+ free(pMeta->tables);
+ free(pMeta);
+ return NULL;
+ }
+
+ pMeta->mfh = tsdbInitMetaFile(rootDir, maxTables);
+ if (pMeta->mfh == NULL) {
+ taosHashCleanup(pMeta->map);
free(pMeta->tables);
free(pMeta);
return NULL;
@@ -45,6 +107,8 @@ STsdbMeta *tsdbCreateMeta(int32_t maxTables) {
int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
if (pMeta == NULL) return 0;
+ tsdbCloseMetaFile(pMeta->mfh);
+
for (int i = 0; i < pMeta->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbFreeTable(pMeta->tables[i]);
@@ -53,14 +117,14 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
free(pMeta->tables);
- STable *pTable = pMeta->stables;
+ STable *pTable = pMeta->superList;
while (pTable != NULL) {
STable *pTemp = pTable;
pTable = pTemp->next;
tsdbFreeTable(pTemp);
}
- taosHashCleanup(pMeta->tableMap);
+ taosHashCleanup(pMeta->map);
free(pMeta);
@@ -68,74 +132,65 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
- if (tsdbCheckTableCfg(pCfg) < 0) {
- return -1;
- }
+ if (tsdbCheckTableCfg(pCfg) < 0) return -1;
- STable *pSTable = NULL;
+ STable *super = NULL;
int newSuper = 0;
- if (IS_CREATE_STABLE(pCfg)) { // to create a TSDB_STABLE, check if super table exists
- pSTable = tsdbGetTableByUid(pMeta, pCfg->stableUid);
- if (pSTable == NULL) { // super table not exists, try to create it
+ if (pCfg->type == TSDB_STABLE) {
+ super = tsdbGetTableByUid(pMeta, pCfg->superUid);
+ if (super == NULL) { // super table not exists, try to create it
newSuper = 1;
- pSTable = (STable *)calloc(1, sizeof(STable));
- if (pSTable == NULL) return -1;
-
- pSTable->tableId.uid = pCfg->stableUid;
- pSTable->tableId.tid = -1;
- pSTable->type = TSDB_SUPER_TABLE;
- // pSTable->createdTime = pCfg->createdTime; // The created time is not required
- pSTable->stableUid = -1;
- pSTable->numOfCols = pCfg->numOfCols;
- pSTable->pSchema = tdDupSchema(pCfg->schema);
- pSTable->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
+ // TODO: use function to implement create table object
+ super = (STable *)calloc(1, sizeof(STable));
+ if (super == NULL) return -1;
+
+ super->type = TSDB_SUPER_TABLE;
+ super->tableId.uid = pCfg->superUid;
+ super->tableId.tid = -1;
+ super->superUid = TSDB_INVALID_SUPER_TABLE_ID;
+ super->schema = tdDupSchema(pCfg->schema);
+ super->tagSchema = tdDupSchema(pCfg->tagSchema);
+ super->tagVal = tdDataRowDup(pCfg->tagValues);
+ super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
0, NULL); // Allow duplicate key, no lock
- if (pSTable->content.pIndex == NULL) {
- free(pSTable);
+
+ if (super->content.pIndex == NULL) {
+ tdFreeSchema(super->schema);
+ tdFreeSchema(super->tagSchema);
+ tdFreeDataRow(super->tagVal);
+ free(super);
return -1;
}
} else {
- if (pSTable->type != TSDB_SUPER_TABLE) return -1;
+ if (super->type != TSDB_SUPER_TABLE) return -1;
}
}
- STable *pTable = (STable *)malloc(sizeof(STable));
- if (pTable == NULL) {
- if (newSuper) tsdbFreeTable(pSTable);
+ STable *table = (STable *)malloc(sizeof(STable));
+ if (table == NULL) {
+ if (newSuper) tsdbFreeTable(super);
return -1;
}
- pTable->tableId = pCfg->tableId;
- pTable->createdTime = pCfg->createdTime;
+ table->tableId = pCfg->tableId;
if (IS_CREATE_STABLE(pCfg)) { // TSDB_STABLE
- pTable->type = TSDB_STABLE;
- pTable->stableUid = pCfg->stableUid;
- pTable->pTagVal = tdDataRowDup(pCfg->tagValues);
+ table->type = TSDB_STABLE;
+ table->superUid = pCfg->superUid;
+ table->tagVal = tdDataRowDup(pCfg->tagValues);
} else { // TSDB_NTABLE
- pTable->type = TSDB_NTABLE;
- pTable->stableUid = -1;
- pTable->pSchema = tdDupSchema(pCfg->schema);
+ table->type = TSDB_NTABLE;
+ table->superUid = -1;
+ table->schema = tdDupSchema(pCfg->schema);
}
- pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL);
+ table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL);
- if (newSuper) tsdbAddTableToMeta(pMeta, pSTable);
- tsdbAddTableToMeta(pMeta, pTable);
+ if (newSuper) tsdbAddTableToMeta(pMeta, super);
+ tsdbAddTableToMeta(pMeta, table);
return 0;
}
-STsdbMeta *tsdbOpenMeta(char *tsdbDir) {
- // TODO : Open meta file for reading
-
- STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
- if (pMeta == NULL) {
- return NULL;
- }
-
- return pMeta;
-}
-
/**
* Check if a table is valid to insert.
* @return NULL for invalid and the pointer to the table if valid
@@ -183,9 +238,9 @@ int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
static int tsdbFreeTable(STable *pTable) {
// TODO: finish this function
if (pTable->type == TSDB_STABLE) {
- tdFreeDataRow(pTable->pTagVal);
+ tdFreeDataRow(pTable->tagVal);
} else {
- tdFreeSchema(pTable->pSchema);
+ tdFreeSchema(pTable->schema);
}
// Free content
@@ -205,7 +260,7 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg) {
}
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
- void *ptr = taosHashGet(pMeta->tableMap, (char *)(&uid), sizeof(uid));
+ void *ptr = taosHashGet(pMeta->map, (char *)(&uid), sizeof(uid));
if (ptr == NULL) return NULL;
@@ -215,12 +270,12 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_SUPER_TABLE) {
// add super table to the linked list
- if (pMeta->stables == NULL) {
- pMeta->stables = pTable;
+ if (pMeta->superList == NULL) {
+ pMeta->superList = pTable;
pTable->next = NULL;
} else {
- STable *pTemp = pMeta->stables;
- pMeta->stables = pTable;
+ STable *pTemp = pMeta->superList;
+ pMeta->superList = pTable;
pTable->next = pTemp;
}
} else {
@@ -244,7 +299,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
// TODO: add the table to the map
int64_t uid = pTable->tableId.uid;
- if (taosHashPut(pMeta->tableMap, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) {
+ if (taosHashPut(pMeta->map, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) {
return -1;
}
return 0;
@@ -259,4 +314,9 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_STABLE);
// TODO
return 0;
+}
+
+static int tsdbEstimateTableEncodeSize(STable *pTable) {
+ // TODO
+ return 0;
}
\ No newline at end of file
diff --git a/src/vnode/tsdb/src/tsdbMetaFile.c b/src/vnode/tsdb/src/tsdbMetaFile.c
new file mode 100644
index 0000000000000000000000000000000000000000..ee173d7d7169b19d007eb5cc8eeb37644ec13ed7
--- /dev/null
+++ b/src/vnode/tsdb/src/tsdbMetaFile.c
@@ -0,0 +1,225 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#include
+#include
+
+#include "taosdef.h"
+#include "hash.h"
+#include "tsdbMetaFile.h"
+
+#define TSDB_META_FILE_HEADER_SIZE 512
+
+typedef struct {
+ int32_t offset;
+ int32_t size;
+} SRecordInfo;
+
+static int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
+static int32_t tsdbCheckMetaHeader(int fd);
+static int32_t tsdbWriteMetaHeader(int fd);
+static int tsdbCreateMetaFile(char *fname);
+static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh);
+
+SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) {
+ // TODO
+ char fname[128] = "\0";
+ if (tsdbGetMetaFileName(rootDir, fname) < 0) return NULL;
+
+ SMetaFile *mfh = (SMetaFile *)calloc(1, sizeof(SMetaFile));
+ if (mfh == NULL) return NULL;
+
+ // OPEN MAP
+ mfh->map =
+ taosHashInit(maxTables * TSDB_META_HASH_FRACTION, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
+ if (mfh->map == NULL) {
+ free(mfh);
+ return NULL;
+ }
+
+ // OPEN FILE
+ if (access(fname, F_OK) < 0) { // file not exists
+ mfh->fd = tsdbCreateMetaFile(fname);
+ if (mfh->fd < 0) {
+ taosHashCleanup(mfh->map);
+ free(mfh);
+ return NULL;
+ }
+ } else { // file exists, recover from file
+ if (tsdbRestoreFromMetaFile(fname, mfh) < 0) {
+ taosHashCleanup(mfh->map);
+ free(mfh);
+ return NULL;
+ }
+ }
+
+ return mfh;
+}
+
+int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen) {
+ if (taosHashGet(mfh->map, (char *)(&uid), sizeof(uid)) != NULL) {
+ return -1;
+ }
+
+ SRecordInfo info;
+ info.offset = mfh->size;
+ info.size = contLen; // TODO: Here is not correct
+
+ mfh->size += (contLen + sizeof(SRecordInfo));
+
+ if (taosHashPut(mfh->map, (char *)(&uid), sizeof(uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
+ return -1;
+ }
+
+ // TODO: make below a function to implement
+ if (fseek(mfh->fd, info.offset, SEEK_CUR) < 0) {
+ return -1;
+ }
+
+ if (write(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) < 0) {
+ return -1;
+ }
+
+ if (write(mfh->fd, cont, contLen) < 0) {
+ return -1;
+ }
+
+ fsync(mfh->fd);
+
+ mfh->nRecord++;
+
+ return 0;
+}
+
+int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid) {
+ char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid));
+ if (ptr == NULL) return -1;
+
+ SRecordInfo info = *(SRecordInfo *)ptr;
+
+ // Remove record from hash table
+ taosHashRemove(mfh->map, (char *)(&uid), sizeof(uid));
+
+ // Remove record from file
+
+ info.offset = -info.offset;
+ if (fseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
+ return -1;
+ }
+
+ if (write(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) < 0) {
+ return -1;
+ }
+
+ fsync(mfh->fd);
+
+ mfh->nDel++;
+
+ return 0;
+}
+
+int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen) {
+ char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid));
+ if (ptr == NULL) return -1;
+
+ SRecordInfo info = *(SRecordInfo *)ptr;
+ // Update the hash table
+ if (taosHashPut(mfh->map, (char *)(&uid), sizeof(uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
+ return -1;
+ }
+
+ // Update record in file
+ if (info.size >= contLen) { // Just update it in place
+ info.size = contLen;
+
+ } else { // Just append to the end of file
+ info.offset = mfh->size;
+ info.size = contLen;
+
+ mfh->size += contLen;
+ }
+ if (fseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
+ return -1;
+ }
+
+ if (write(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) < 0) {
+ return -1;
+ }
+
+ fsync(mfh->fd);
+
+ return 0;
+}
+
+void tsdbCloseMetaFile(SMetaFile *mfh) {
+ if (mfh == NULL) return;
+ close(mfh);
+
+ taosHashCleanup(mfh->map);
+}
+
+static int32_t tsdbGetMetaFileName(char *rootDir, char *fname) {
+ if (rootDir == NULL) return -1;
+ sprintf(fname, "%s/%s", rootDir, TSDB_META_FILE_NAME);
+ return 0;
+}
+
+static int32_t tsdbCheckMetaHeader(int fd) {
+ // TODO: write the meta file header check function
+ return 0;
+}
+
+static int32_t tsdbWriteMetaHeader(int fd) {
+ // TODO: write the meta file header to file
+ return 0;
+}
+
+static int tsdbCreateMetaFile(char *fname) {
+ int fd = open(fname, O_RDWR | O_CREAT, 0755);
+ if (fd < 0) return -1;
+
+ if (tsdbWriteMetaHeader(fd) < 0) {
+ close(fd);
+ return NULL;
+ }
+
+ return fd;
+}
+
+static int tsdbCheckMetaFileIntegrety(int fd) {
+ // TODO
+ return 0;
+}
+
+static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
+ int fd = open(fname, O_RDWR);
+ if (fd < 0) return -1;
+
+ if (tsdbCheckMetaFileIntegrety(fd) < 0) {
+ // TODO: decide if to auto-recover the file
+ close(fd);
+ return -1;
+ }
+
+ if (fseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET) < 0) {
+ // TODO: deal with the error
+ close(fd);
+ return -1;
+ }
+
+ mfh->fd = fd;
+ // TODO: iterate to read the meta file to restore the meta data
+
+ return 0;
+}
\ No newline at end of file
diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp
index 737deee3c5a7e1fd732a5b77b80c00db4f966a29..7b09fdfcdebc0828b08e22f30157a413a553b3ab 100644
--- a/src/vnode/tsdb/tests/tsdbTests.cpp
+++ b/src/vnode/tsdb/tests/tsdbTests.cpp
@@ -3,92 +3,63 @@
#include "tsdb.h"
#include "dataformat.h"
-#include "tsdbMeta.h"
-
-TEST(TsdbTest, createTable) {
- STsdbMeta *pMeta = tsdbCreateMeta(100);
- ASSERT_NE(pMeta, nullptr);
-
- STableCfg config;
- config.tableId.tid = 0;
- config.tableId.uid = 98868728187539L;
- config.numOfCols = 5;
- config.schema = tdNewSchema(config.numOfCols);
- for (int i = 0; i < schemaNCols(config.schema); i++) {
- STColumn *pCol = tdNewCol(TSDB_DATA_TYPE_BIGINT, i, 0);
- tdColCpy(schemaColAt(config.schema, i), pCol);
- tdFreeCol(pCol);
- }
- config.tagValues = nullptr;
-
- tsdbCreateTableImpl(pMeta, &config);
-
- STable *pTable = tsdbGetTableByUid(pMeta, config.tableId.uid);
- ASSERT_NE(pTable, nullptr);
-}
TEST(TsdbTest, createRepo) {
- STsdbCfg *pCfg = tsdbCreateDefaultCfg();
+ STsdbCfg config;
- // Create a tsdb repository
- tsdb_repo_t *pRepo = tsdbCreateRepo("/root/mnt/test/vnode0", pCfg, NULL);
+ // 1. Create a tsdb repository
+ tsdbSetDefaultCfg(&config);
+ tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL);
ASSERT_NE(pRepo, nullptr);
- tsdbFreeCfg(pCfg);
- // create a normal table in this repository
- STableCfg config;
- config.tableId.tid = 0;
- config.tableId.uid = 98868728187539L;
- config.numOfCols = 5;
- config.schema = tdNewSchema(config.numOfCols);
- STColumn *pCol = tdNewCol(TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
- tdColCpy(schemaColAt(config.schema, 0), pCol);
- tdFreeCol(pCol);
- for (int i = 1; i < schemaNCols(config.schema); i++) {
- pCol = tdNewCol(TSDB_DATA_TYPE_BIGINT, i, 0);
- tdColCpy(schemaColAt(config.schema, i), pCol);
- tdFreeCol(pCol);
- }
+ // 2. Create a normal table
+ STableCfg tCfg;
+ ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1);
+ ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NTABLE, 987607499877672L, 0), 0);
- tsdbCreateTable(pRepo, &config);
- // Write some data
+ int nCols = 5;
+ STSchema *schema = tdNewSchema(nCols);
- // int32_t size = sizeof(SSubmitMsg) + sizeof(SSubmitBlock) + tdMaxRowDataBytes(config.schema) * 10 + sizeof(int32_t);
+ for (int i = 0; i < nCols; i++) {
+ if (i == 0) {
+ tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
+ } else {
+ tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1);
+ }
+ }
- // tdUpdateSchema(config.schema);
+ tsdbTableSetSchema(&tCfg, schema, true);
- // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(size);
- // pMsg->numOfTables = 1; // TODO: use api
+ tsdbCreateTable(pRepo, &tCfg);
- // SSubmitBlock *pBlock = (SSubmitBlock *)pMsg->data;
- // pBlock->tableId = {.uid = 98868728187539L, .tid = 0};
- // pBlock->sversion = 0;
- // pBlock->len = sizeof(SSubmitBlock);
+ // // 3. Loop to write some simple data
+ int nRows = 10;
+ SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * nRows);
- // SDataRows rows = pBlock->data;
- // dataRowsInit(rows);
+ SSubmitBlk *pBlock = pMsg->blocks;
+ pBlock->tableId = {.uid = 987607499877672L, .tid = 0};
+ pBlock->sversion = 0;
+ pBlock->len = 0;
+ int64_t start_time = 1584081000000;
+ for (int i = 0; i < nRows; i++) {
+ int64_t ttime = start_time + 1000 * i;
+ SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
+ tdInitDataRow(row, schema);
- // SDataRow row = tdNewDataRow(tdMaxRowDataBytes(config.schema));
- // int64_t ttime = 1583508800000;
- // for (int i = 0; i < 10; i++) { // loop over rows
- // ttime += (10000 * i);
- // tdDataRowReset(row);
- // for (int j = 0; j < schemaNCols(config.schema); j++) {
- // if (j == 0) { // set time stamp
- // tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 40);
- // } else { // set other fields
- // int32_t val = 10;
- // tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 40);
- // }
- // }
+ for (int j = 0; j < schemaNCols(schema); j++) {
+ if (j == 0) { // Just for timestamp
+ tdAppendColVal(row, (void *)(&time), schemaColAt(schema, j));
+ } else { // For int
+ int val = 10;
+ tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
+ }
- // tdDataRowsAppendRow(rows, row);
- // }
+ }
+ pBlock->len += dataRowLen(row);
- // tsdbInsertData(pRepo, pMsg);
+ }
+ pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
- // tdFreeDataRow(row);
+ tsdbInsertData(pRepo, pMsg);
+}
- tdFreeSchema(config.schema);
- tsdbDropRepo(pRepo);
-}
\ No newline at end of file