未验证 提交 e242b4fd 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1759 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -631,47 +631,44 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, ...@@ -631,47 +631,44 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
int32_t firstPartLen = 0; // TODO: optimize this function
int len = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta);
SSubmitBlk* pBlock = pDataBlock;
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk)); memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
pDataBlock += sizeof(SSubmitBlk); pDataBlock += sizeof(SSubmitBlk);
int32_t total = sizeof(int32_t)*2; int32_t flen = 0;
for(int32_t i = 0; i < tinfo.numOfColumns; ++i) { for (int32_t i = 0; i < tinfo.numOfColumns; ++i) {
switch (pSchema[i].type) { flen += TYPE_BYTES[pSchema[i].type];
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
assert(0); // not support binary yet
firstPartLen += sizeof(int32_t);break;
}
default:
firstPartLen += tDataTypeDesc[pSchema[i].type].nSize;
total += tDataTypeDesc[pSchema[i].type].nSize;
}
} }
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
pBlock->len = 0;
SSubmitBlk* pBlock = (SSubmitBlk*) pTableDataBlock->pData; for (int32_t i = 0; i < htons(pBlock->numOfRows); ++i) {
int32_t rows = htons(pBlock->numOfRows); SDataRow trow = (SDataRow)pDataBlock;
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
for(int32_t i = 0; i < rows; ++i) {
*(int32_t*) pDataBlock = total; int toffset = 0;
pDataBlock += sizeof(int32_t); for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
*(int32_t*) pDataBlock = firstPartLen; toffset += TYPE_BYTES[pSchema[j].type];
pDataBlock += sizeof(int32_t); p += pSchema[j].bytes;
}
memcpy(pDataBlock, p, pTableDataBlock->rowSize);
// p += pTableDataBlock->rowSize;
p += pTableDataBlock->rowSize; pDataBlock += dataRowLen(trow);
pDataBlock += pTableDataBlock->rowSize; pBlock->len += dataRowLen(trow);
} }
len = pBlock->len;
pBlock->len = htonl(pBlock->len);
return len;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
...@@ -734,7 +731,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi ...@@ -734,7 +731,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
pBlocks->len = htonl(len); pBlocks->len = htonl(len);
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock); len = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
dataBuf->size += (len + sizeof(SSubmitBlk)); dataBuf->size += (len + sizeof(SSubmitBlk));
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <string.h> #include <string.h>
#include "taosdef.h" #include "taosdef.h"
#include "tutil.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -30,7 +31,7 @@ typedef struct { ...@@ -30,7 +31,7 @@ typedef struct {
int8_t type; // Column type int8_t type; // Column type
int16_t colId; // column ID int16_t colId; // column ID
int32_t bytes; // column bytes int32_t bytes; // column bytes
int32_t offset; // point offset in a row data int32_t offset; // point offset in SDataRow after the header part
} STColumn; } STColumn;
#define colType(col) ((col)->type) #define colType(col) ((col)->type)
...@@ -43,26 +44,25 @@ typedef struct { ...@@ -43,26 +44,25 @@ typedef struct {
#define colSetBytes(col, b) (colBytes(col) = (b)) #define colSetBytes(col, b) (colBytes(col) = (b))
#define colSetOffset(col, o) (colOffset(col) = (o)) #define colSetOffset(col, o) (colOffset(col) = (o))
STColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes);
void tdFreeCol(STColumn *pCol);
void tdColCpy(STColumn *dst, STColumn *src);
void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes);
// ----------------- TSDB SCHEMA DEFINITION // ----------------- TSDB SCHEMA DEFINITION
typedef struct { typedef struct {
int totalCols; // Total columns allocated
int numOfCols; // Number of columns appended int numOfCols; // Number of columns appended
int padding; // Total columns allocated int tlen; // maximum length of a SDataRow without the header part
int flen; // First part length in a SDataRow after the header part
STColumn columns[]; STColumn columns[];
} STSchema; } STSchema;
#define schemaNCols(s) ((s)->numOfCols) #define schemaNCols(s) ((s)->numOfCols)
#define schemaTotalCols(s) ((s)->totalCols)
#define schemaTLen(s) ((s)->tlen)
#define schemaFLen(s) ((s)->flen)
#define schemaColAt(s, i) ((s)->columns + i) #define schemaColAt(s, i) ((s)->columns + i)
STSchema *tdNewSchema(int32_t nCols); STSchema *tdNewSchema(int32_t nCols);
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes); #define tdFreeSchema(s) tfree((s))
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdDupSchema(STSchema *pSchema); STSchema *tdDupSchema(STSchema *pSchema);
void tdFreeSchema(STSchema *pSchema);
void tdUpdateSchema(STSchema *pSchema);
int tdGetSchemaEncodeSize(STSchema *pSchema); int tdGetSchemaEncodeSize(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema); void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc); STSchema *tdDecodeSchema(void **psrc);
...@@ -70,53 +70,100 @@ STSchema *tdDecodeSchema(void **psrc); ...@@ -70,53 +70,100 @@ STSchema *tdDecodeSchema(void **psrc);
// ----------------- Data row structure // ----------------- Data row structure
/* A data row, the format is like below: /* A data row, the format is like below:
* +----------+---------+---------------------------------+---------------------------------+ * |<------------------------------------- len ---------------------------------->|
* | int32_t | int32_t | | | * |<--Head ->|<--------- flen -------------->| |
* +----------+---------+---------------------------------+---------------------------------+ * +----------+---------------------------------+---------------------------------+
* | len | flen | First part | Second part | * | int32_t | | |
* +----------+---------+---------------------------------+---------------------------------+ * +----------+---------------------------------+---------------------------------+
* plen: first part length * | len | First part | Second part |
* len: the length including sizeof(row) + sizeof(len) * +----------+---------------------------------+---------------------------------+
* row: actual row data encoding
*/ */
typedef void *SDataRow; typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE sizeof(int32_t)
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
#define dataRowLen(r) (*(int32_t *)(r)) #define dataRowLen(r) (*(int32_t *)(r))
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t))) #define dataRowTuple(r) POINTER_DRIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) #define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) #define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
#define dataRowIdx(r, i) ((char *)(r) + i)
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) #define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
#define dataRowAt(r, idx) ((char *)(r) + (idx)) #define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
void tdInitDataRow(SDataRow row, STSchema *pSchema);
int tdMaxRowBytesFromSchema(STSchema *pSchema);
SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema);
SDataRow tdNewDataRowFromSchema(STSchema *pSchema); SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row); void tdFreeDataRow(SDataRow row);
int tdAppendColVal(SDataRow row, void *value, STColumn *pCol); void tdInitDataRow(SDataRow row, STSchema *pSchema);
void tdDataRowReset(SDataRow row, STSchema *pSchema); int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return POINTER_DRIFT(row, *(VarDataOffsetT *)POINTER_DRIFT(row, offset));
break;
default:
return POINTER_DRIFT(row, offset);
break;
}
}
// ----------------- Data column structure // ----------------- Data column structure
typedef struct SDataCol { typedef struct SDataCol {
int8_t type; int8_t type; // column type
int16_t colId; int16_t colId; // column ID
int bytes; int bytes; // column data bytes defined
int len; int offset; // data offset in a SDataRow (including the header size)
int offset; int spaceSize; // Total space size for this column
void * pData; // Original data int len; // column data length
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
void * pData; // Actual data pointer
} SDataCol; } SDataCol;
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints);
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints);
void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
switch (pCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return POINTER_DRIFT(pCol->pData, pCol->dataOff[row]);
break;
default:
return POINTER_DRIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
break;
}
}
static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
ASSERT(rows > 0);
switch (pDataCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1));
break;
default:
return TYPE_BYTES[pDataCol->type] * rows;
}
}
typedef struct { typedef struct {
int maxRowSize; int maxRowSize;
int maxCols; // max number of columns int maxCols; // max number of columns
int maxPoints; // max number of points int maxPoints; // max number of points
int bufSize;
int numOfPoints; int numOfPoints;
int numOfCols; // Total number of cols int numOfCols; // Total number of cols
int sversion; // TODO: set sversion int sversion; // TODO: set sversion
...@@ -125,7 +172,7 @@ typedef struct { ...@@ -125,7 +172,7 @@ typedef struct {
} SDataCols; } SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyAt(pCols, idx) ((int64_t *)(keyCol(pCols)->pData))[(idx)] #define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) #define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0)
#define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1) #define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1)
......
此差异已折叠。
...@@ -16,33 +16,34 @@ ...@@ -16,33 +16,34 @@
#include "taosdef.h" #include "taosdef.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tscompression.h"
const int32_t TYPE_BYTES[11] = { const int32_t TYPE_BYTES[11] = {
-1, // TSDB_DATA_TYPE_NULL -1, // TSDB_DATA_TYPE_NULL
sizeof(int8_t), // TSDB_DATA_TYPE_BOOL sizeof(int8_t), // TSDB_DATA_TYPE_BOOL
sizeof(int8_t), // TSDB_DATA_TYPE_TINYINT sizeof(int8_t), // TSDB_DATA_TYPE_TINYINT
sizeof(int16_t), // TSDB_DATA_TYPE_SMALLINT sizeof(int16_t), // TSDB_DATA_TYPE_SMALLINT
sizeof(int32_t), // TSDB_DATA_TYPE_INT sizeof(int32_t), // TSDB_DATA_TYPE_INT
sizeof(int64_t), // TSDB_DATA_TYPE_BIGINT sizeof(int64_t), // TSDB_DATA_TYPE_BIGINT
sizeof(float), // TSDB_DATA_TYPE_FLOAT sizeof(float), // TSDB_DATA_TYPE_FLOAT
sizeof(double), // TSDB_DATA_TYPE_DOUBLE sizeof(double), // TSDB_DATA_TYPE_DOUBLE
sizeof(int32_t), // TSDB_DATA_TYPE_BINARY sizeof(VarDataOffsetT), // TSDB_DATA_TYPE_BINARY
sizeof(TSKEY), // TSDB_DATA_TYPE_TIMESTAMP sizeof(TSKEY), // TSDB_DATA_TYPE_TIMESTAMP
sizeof(int32_t) // TSDB_DATA_TYPE_NCHAR sizeof(VarDataOffsetT) // TSDB_DATA_TYPE_NCHAR
}; };
tDataTypeDescriptor tDataTypeDesc[11] = { tDataTypeDescriptor tDataTypeDesc[11] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE"}, {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL"}, {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT"}, {TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT"}, {TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint},
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT"}, {TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt},
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT"}, {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT"}, {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE"}, {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY"}, {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP"}, {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR"}, {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString},
}; };
char tTokenTypeSwitcher[13] = { char tTokenTypeSwitcher[13] = {
......
...@@ -32,6 +32,13 @@ extern "C" { ...@@ -32,6 +32,13 @@ extern "C" {
#define TSKEY int64_t #define TSKEY int64_t
#endif #endif
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT;
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT)))
// this data type is internally used only in 'in' query to hold the values // this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
...@@ -121,6 +128,10 @@ typedef struct tDataTypeDescriptor { ...@@ -121,6 +128,10 @@ typedef struct tDataTypeDescriptor {
int16_t nameLen; int16_t nameLen;
int32_t nSize; int32_t nSize;
char * aName; char * aName;
int (*compFunc)(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize);
int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
} tDataTypeDescriptor; } tDataTypeDescriptor;
extern tDataTypeDescriptor tDataTypeDesc[11]; extern tDataTypeDescriptor tDataTypeDesc[11];
......
...@@ -582,7 +582,7 @@ void exprSerializeTest1() { ...@@ -582,7 +582,7 @@ void exprSerializeTest1() {
tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p1, nullptr);
tExprTreeDestroy(&p2, nullptr); tExprTreeDestroy(&p2, nullptr);
tbufClose(&bw); // tbufClose(&bw);
} }
void exprSerializeTest2() { void exprSerializeTest2() {
...@@ -627,7 +627,7 @@ void exprSerializeTest2() { ...@@ -627,7 +627,7 @@ void exprSerializeTest2() {
tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p1, nullptr);
tExprTreeDestroy(&p2, nullptr); tExprTreeDestroy(&p2, nullptr);
tbufClose(&bw); // tbufClose(&bw);
} }
} // namespace } // namespace
TEST(testCase, astTest) { TEST(testCase, astTest) {
......
...@@ -153,17 +153,16 @@ typedef struct { ...@@ -153,17 +153,16 @@ typedef struct {
} SCacheMem; } SCacheMem;
typedef struct { typedef struct {
int maxBytes;
int cacheBlockSize; int cacheBlockSize;
int totalCacheBlocks; int totalCacheBlocks;
STsdbCachePool pool; STsdbCachePool pool;
STsdbCacheBlock *curBlock; STsdbCacheBlock *curBlock;
SCacheMem * mem; SCacheMem * mem;
SCacheMem * imem; SCacheMem * imem;
TsdbRepoT * pRepo; TsdbRepoT * pRepo;
} STsdbCache; } STsdbCache;
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, TsdbRepoT *pRepo); STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo);
void tsdbFreeCache(STsdbCache *pCache); void tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key); void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
...@@ -297,7 +296,7 @@ typedef struct { ...@@ -297,7 +296,7 @@ typedef struct {
// TODO: take pre-calculation into account // TODO: take pre-calculation into account
typedef struct { typedef struct {
int16_t colId; // Column ID int16_t colId; // Column ID
int16_t len; // Column length int16_t len; // Column length // TODO: int16_t is not enough
int32_t type : 8; int32_t type : 8;
int32_t offset : 24; int32_t offset : 24;
} SCompCol; } SCompCol;
...@@ -426,6 +425,8 @@ typedef struct { ...@@ -426,6 +425,8 @@ typedef struct {
SCompData *pCompData; SCompData *pCompData;
SDataCols *pDataCols[2]; SDataCols *pDataCols[2];
void *blockBuffer; // Buffer to hold the whole data block
void *compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper; } SRWHelper;
// --------- Helper state // --------- Helper state
...@@ -445,13 +446,11 @@ typedef struct { ...@@ -445,13 +446,11 @@ typedef struct {
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo); int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo); int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
// int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg);
void tsdbDestroyHelper(SRWHelper *pHelper); void tsdbDestroyHelper(SRWHelper *pHelper);
void tsdbResetHelper(SRWHelper *pHelper); void tsdbResetHelper(SRWHelper *pHelper);
// --------- For set operations // --------- For set operations
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup); int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
// void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema);
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo); void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo);
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError); int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
......
...@@ -21,29 +21,25 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache); ...@@ -21,29 +21,25 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list); static void tsdbFreeBlockList(SList *list);
static void tsdbFreeCacheMem(SCacheMem *mem); static void tsdbFreeCacheMem(SCacheMem *mem);
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, TsdbRepoT *pRepo) { STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) {
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
if (pCache == NULL) return NULL; if (pCache == NULL) return NULL;
if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
cacheBlockSize *= (1024 * 1024); cacheBlockSize *= (1024 * 1024);
if (maxBytes < 0) maxBytes = cacheBlockSize * TSDB_DEFAULT_TOTAL_BLOCKS; if (totalBlocks <= 1) totalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS;
pCache->maxBytes = maxBytes;
pCache->cacheBlockSize = cacheBlockSize; pCache->cacheBlockSize = cacheBlockSize;
pCache->totalCacheBlocks = totalBlocks;
pCache->pRepo = pRepo; pCache->pRepo = pRepo;
int nBlocks = maxBytes / cacheBlockSize + 1;
if (nBlocks <= 1) nBlocks = 2;
pCache->totalCacheBlocks = nBlocks;
STsdbCachePool *pPool = &(pCache->pool); STsdbCachePool *pPool = &(pCache->pool);
pPool->index = 0; pPool->index = 0;
pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *)); pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *));
if (pPool->memPool == NULL) goto _err; if (pPool->memPool == NULL) goto _err;
for (int i = 0; i < nBlocks; i++) { for (int i = 0; i < totalBlocks; i++) {
STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize);
if (pBlock == NULL) { if (pBlock == NULL) {
goto _err; goto _err;
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "tsdbMain.h" #include "tsdbMain.h"
#include "tscompression.h" #include "tscompression.h"
#include "tchecksum.h"
#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision #define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision
#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO)) #define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO))
...@@ -202,7 +203,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -202,7 +203,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
return NULL; return NULL;
} }
pRepo->tsdbCache = tsdbInitCache(-1, -1, (TsdbRepoT *)pRepo); pRepo->tsdbCache = tsdbInitCache(pRepo->config.cacheBlockSize, pRepo->config.totalBlocks, (TsdbRepoT *)pRepo);
if (pRepo->tsdbCache == NULL) { if (pRepo->tsdbCache == NULL) {
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo->rootDir); free(pRepo->rootDir);
......
...@@ -242,7 +242,7 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId id, int32_t colId, int16_t* ...@@ -242,7 +242,7 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId id, int32_t colId, int16_t*
assert(pCol != NULL); assert(pCol != NULL);
SDataRow row = (SDataRow)pTable->tagVal; SDataRow row = (SDataRow)pTable->tagVal;
char* d = dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); char* d = dataRowTuple(row);
*val = d; *val = d;
*type = pCol->type; *type = pCol->type;
...@@ -451,9 +451,8 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -451,9 +451,8 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
// Update the pMeta->maxCols and pMeta->maxRowBytes // Update the pMeta->maxCols and pMeta->maxRowBytes
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) { if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) {
if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema); if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema);
int bytes = tdMaxRowBytesFromSchema(pTable->schema); int bytes = dataRowMaxBytesFromSchema(pTable->schema);
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes; if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
tdUpdateSchema(pTable->schema);
} }
return tsdbAddTableIntoMap(pMeta, pTable); return tsdbAddTableIntoMap(pMeta, pTable);
...@@ -524,5 +523,5 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) { ...@@ -524,5 +523,5 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) {
char *getTupleKey(const void * data) { char *getTupleKey(const void * data) {
SDataRow row = (SDataRow)data; SDataRow row = (SDataRow)data;
return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); return POINTER_DRIFT(row, TD_DATA_ROW_HEAD_SIZE);
} }
\ No newline at end of file
...@@ -131,6 +131,11 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t ...@@ -131,6 +131,11 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
// Init block part // Init block part
if (tsdbInitHelperBlock(pHelper) < 0) goto _err; if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
pHelper->blockBuffer =
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols +
pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM));
if (pHelper->blockBuffer == NULL) goto _err;
return 0; return 0;
_err: _err:
...@@ -149,6 +154,8 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { ...@@ -149,6 +154,8 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
void tsdbDestroyHelper(SRWHelper *pHelper) { void tsdbDestroyHelper(SRWHelper *pHelper) {
if (pHelper) { if (pHelper) {
tzfree(pHelper->blockBuffer);
tzfree(pHelper->compBuffer);
tsdbDestroyHelperFile(pHelper); tsdbDestroyHelperFile(pHelper);
tsdbDestroyHelperTable(pHelper); tsdbDestroyHelperTable(pHelper);
tsdbDestroyHelperBlock(pHelper); tsdbDestroyHelperBlock(pHelper);
...@@ -330,7 +337,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { ...@@ -330,7 +337,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks); int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last); ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
if (rowsToWrite < 0) goto _err; if (rowsToWrite < 0) goto _err;
} else { // Has key overlap } else { // Has key overlap
...@@ -552,61 +559,97 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, ...@@ -552,61 +559,97 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx,
return 0; return 0;
} }
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfPoints,
int maxPoints, char *buffer, int bufferSize) {
// Verify by checksum
if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;
// Decode the data
if (comp) {
// // Need to decompress
pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(
content, len - sizeof(TSCKSUM), numOfPoints, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
pDataCol->len += (sizeof(int32_t) * maxPoints);
dataColSetOffset(pDataCol, numOfPoints);
}
} else {
// No need to decompress, just memcpy it
switch (pDataCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
pDataCol->len = sizeof(int32_t) * maxPoints;
memcpy((char *)pDataCol->pData + pDataCol->len, content, len - sizeof(TSCKSUM));
pDataCol->len += (len - sizeof(TSCKSUM));
dataColSetOffset(pDataCol, numOfPoints);
break;
default:
pDataCol->len = len - sizeof(TSCKSUM);
memcpy(pDataCol->pData, content, pDataCol->len);
break;
}
}
return 0;
}
/** /**
* Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1) * Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1)
*/ */
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
SCompData *pCompData = (SCompData *)malloc(pCompBlock->len); ASSERT(tsizeof(pHelper->blockBuffer) >= pCompBlock->len);
if (pCompData == NULL) return -1;
SCompData *pCompData = (SCompData *)pHelper->blockBuffer;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err;
if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err;
ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
// TODO : check the checksum int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err;
for (int i = 0; i < pCompData->numOfCols; i++) {
// TODO: check the data checksum
// if (!taosCheckChecksumWhole())
}
ASSERT(pCompBlock->numOfCols == pCompData->numOfCols);
pDataCols->numOfPoints = pCompBlock->numOfPoints; pDataCols->numOfPoints = pCompBlock->numOfPoints;
int ccol = 0, dcol = 0; // Recover the data
while (true) { int ccol = 0;
if (ccol >= pDataCols->numOfCols) { int dcol = 0;
// TODO: Fill rest NULL while (dcol < pDataCols->numOfCols) {
break; SDataCol *pDataCol = &(pDataCols->cols[dcol]);
if (ccol >= pCompData->numOfCols) {
// Set current column as NULL and forward
dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints);
dcol++;
continue;
} }
if (dcol >= pCompData->numOfCols) break;
SCompCol *pCompCol = &(pCompData->cols[ccol]); SCompCol *pCompCol = &(pCompData->cols[ccol]);
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
if (pCompCol->colId == pDataCol->colId) { if (pCompCol->colId == pDataCol->colId) {
// TODO: uncompress if (pCompBlock->algorithm == TWO_STAGE_COMP) {
memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tsize + pCompCol->offset), pCompCol->len); pHelper->compBuffer = trealloc(pHelper->compBuffer, pCompCol->len + COMP_OVERFLOW_BYTES);
ccol++; if (pHelper->compBuffer == NULL) goto _err;
dcol++; }
} else if (pCompCol->colId > pDataCol->colId) { if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
// TODO: Fill NULL pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints,
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
goto _err;
dcol++; dcol++;
} else {
ccol++; ccol++;
} else if (pCompCol->colId < pDataCol->colId) {
ccol++;
} else {
// Set current column as NULL and forward
dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints);
dcol++;
} }
} }
tfree(pCompData);
return 0; return 0;
_err: _err:
tfree(pCompData);
return -1; return -1;
} }
...@@ -634,36 +677,6 @@ _err: ...@@ -634,36 +677,6 @@ _err:
return -1; return -1;
} }
// static int tsdbCheckHelperCfg(SHelperCfg *pCfg) {
// // TODO
// return 0;
// }
// static void tsdbClearHelperFile(SHelperFile *pHFile) {
// pHFile->fid = -1;
// if (pHFile->headF.fd > 0) {
// close(pHFile->headF.fd);
// pHFile->headF.fd = -1;
// }
// if (pHFile->dataF.fd > 0) {
// close(pHFile->dataF.fd);
// pHFile->dataF.fd = -1;
// }
// if (pHFile->lastF.fd > 0) {
// close(pHFile->lastF.fd);
// pHFile->lastF.fd = -1;
// }
// if (pHFile->nHeadF.fd > 0) {
// close(pHFile->nHeadF.fd);
// pHFile->nHeadF.fd = -1;
// }
// if (pHFile->nLastF.fd > 0) {
// close(pHFile->nLastF.fd);
// pHFile->nLastF.fd = -1;
// }
// }
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
ASSERT(pHelper->files.lastF.fd > 0); ASSERT(pHelper->files.lastF.fd > 0);
struct stat st; struct stat st;
...@@ -677,81 +690,94 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -677,81 +690,94 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints &&
rowsToWrite <= pHelper->config.maxRowsPerFileBlock); rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
SCompData *pCompData = NULL; SCompData *pCompData = (SCompData *)(pHelper->blockBuffer);
int64_t offset = 0; int64_t offset = 0;
offset = lseek(pFile->fd, 0, SEEK_END); offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) goto _err; if (offset < 0) goto _err;
pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols + sizeof(TSCKSUM));
if (pCompData == NULL) goto _err;
int nColsNotAllNull = 0; int nColsNotAllNull = 0;
int32_t toffset = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
SDataCol *pDataCol = pDataCols->cols + ncol; SDataCol *pDataCol = pDataCols->cols + ncol;
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
if (0) { if (isNEleNull(pDataCol, rowsToWrite)) {
// TODO: all data to commit are NULL // all data to commit are NULL, just ignore it
continue; continue;
} }
// Compress the data here
{
// TODO
}
pCompCol->colId = pDataCol->colId; pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type; pCompCol->type = pDataCol->type;
pCompCol->len = TYPE_BYTES[pCompCol->type] * rowsToWrite; // TODO: change it
pCompCol->offset = toffset;
nColsNotAllNull++; nColsNotAllNull++;
toffset += pCompCol->len;
} }
ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols); ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols);
pCompData->delimiter = TSDB_FILE_DELIMITER; // Compress the data if neccessary
pCompData->uid = pHelper->tableInfo.uid; int tcol = 0;
pCompData->numOfCols = nColsNotAllNull; int32_t toffset = 0;
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM);
// Write SCompData + SCompCol part int32_t lsize = tsize;
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);
if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err;
// Write true data part
int nCompCol = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
ASSERT(nCompCol < nColsNotAllNull); if (tcol >= nColsNotAllNull) break;
SDataCol *pDataCol = pDataCols->cols + ncol; SDataCol *pDataCol = pDataCols->cols + ncol;
SCompCol *pCompCol = pCompData->cols + nCompCol; SCompCol *pCompCol = pCompData->cols + tcol;
if (pDataCol->colId == pCompCol->colId) { if (pDataCol->colId != pCompCol->colId) continue;
if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err; void *tptr = (void *)((char *)pCompData + lsize);
tsize += pCompCol->len;
nCompCol++; pCompCol->offset = toffset;
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
if (pHelper->config.compress) {
if (pHelper->config.compress == TWO_STAGE_COMP) {
pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
if (pHelper->compBuffer == NULL) goto _err;
}
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize,
pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer));
} else {
pCompCol->len = tlen;
memcpy(tptr, pDataCol->pData, pCompCol->len);
} }
// Add checksum
pCompCol->len += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len);
toffset += pCompCol->len;
lsize += pCompCol->len;
tcol++;
} }
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = pHelper->tableInfo.uid;
pCompData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);
// Write the whole block to file
if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) goto _err;
// Update pCompBlock membership vairables
pCompBlock->last = isLast; pCompBlock->last = isLast;
pCompBlock->offset = offset; pCompBlock->offset = offset;
pCompBlock->algorithm = pHelper->config.compress; pCompBlock->algorithm = pHelper->config.compress;
pCompBlock->numOfPoints = rowsToWrite; pCompBlock->numOfPoints = rowsToWrite;
pCompBlock->sversion = pHelper->tableInfo.sversion; pCompBlock->sversion = pHelper->tableInfo.sversion;
pCompBlock->len = (int32_t)tsize; pCompBlock->len = (int32_t)lsize;
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
pCompBlock->numOfCols = nColsNotAllNull; pCompBlock->numOfCols = nColsNotAllNull;
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
tfree(pCompData);
return 0; return 0;
_err: _err:
tfree(pCompData);
return -1; return -1;
} }
...@@ -782,7 +808,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -782,7 +808,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
TSKEY keyFirst = dataColsKeyFirst(pDataCols); TSKEY keyFirst = dataColsKeyFirst(pDataCols);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx < pIdx->numOfSuperBlocks); ASSERT(blkIdx < pIdx->numOfBlocks);
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1); ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
...@@ -790,7 +816,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -790,7 +816,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfSuperBlocks-1); ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1);
int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints); rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints);
...@@ -961,7 +987,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { ...@@ -961,7 +987,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfSuperBlocks); ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks);
ASSERT(pCompBlock->numOfSubBlocks == 1); ASSERT(pCompBlock->numOfSubBlocks == 1);
// Adjust memory if no more room // Adjust memory if no more room
...@@ -1004,7 +1030,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -1004,7 +1030,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
ASSERT(pCompBlock->numOfSubBlocks == 0); ASSERT(pCompBlock->numOfSubBlocks == 0);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
...@@ -1088,7 +1114,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -1088,7 +1114,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
......
...@@ -27,7 +27,7 @@ typedef struct { ...@@ -27,7 +27,7 @@ typedef struct {
static int insertData(SInsertInfo *pInfo) { static int insertData(SInsertInfo *pInfo) {
SSubmitMsg *pMsg = SSubmitMsg *pMsg =
(SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit); (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowMaxBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
TSKEY start_time = pInfo->startTime; TSKEY start_time = pInfo->startTime;
...@@ -52,11 +52,12 @@ static int insertData(SInsertInfo *pInfo) { ...@@ -52,11 +52,12 @@ static int insertData(SInsertInfo *pInfo) {
tdInitDataRow(row, pInfo->pSchema); tdInitDataRow(row, pInfo->pSchema);
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) { for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
STColumn *pTCol = schemaColAt(pInfo->pSchema, j);
if (j == 0) { // Just for timestamp if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&start_time), schemaColAt(pInfo->pSchema, j)); tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->bytes, pTCol->offset);
} else { // For int } else { // For int
int val = 10; int val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(pInfo->pSchema, j)); tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->bytes, pTCol->offset);
} }
} }
pBlock->len += dataRowLen(row); pBlock->len += dataRowLen(row);
...@@ -105,9 +106,9 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ...@@ -105,9 +106,9 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
for (int i = 0; i < nCols; i++) { for (int i = 0; i < nCols; i++) {
if (i == 0) { if (i == 0) {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
} else { } else {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1);
} }
} }
...@@ -149,9 +150,9 @@ TEST(TsdbTest, createRepo) { ...@@ -149,9 +150,9 @@ TEST(TsdbTest, createRepo) {
for (int i = 0; i < nCols; i++) { for (int i = 0; i < nCols; i++) {
if (i == 0) { if (i == 0) {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
} else { } else {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1);
} }
} }
...@@ -244,7 +245,7 @@ TEST(TsdbTest, DISABLED_openRepo) { ...@@ -244,7 +245,7 @@ TEST(TsdbTest, DISABLED_openRepo) {
// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); // tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
// STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); // STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
// SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10); // SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5);
// tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable)); // tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable));
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); // tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
......
...@@ -11,7 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -11,7 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tutil ${SRC}) ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil pthread os m rt) TARGET_LINK_LIBRARIES(tutil pthread os m rt lz4)
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
IF (ICONV_INCLUDE_EXIST) IF (ICONV_INCLUDE_EXIST)
ADD_DEFINITIONS(-DUSE_LIBICONV) ADD_DEFINITIONS(-DUSE_LIBICONV)
...@@ -68,7 +68,7 @@ ELSEIF (TD_WINDOWS_64) ...@@ -68,7 +68,7 @@ ELSEIF (TD_WINDOWS_64)
LIST(APPEND SRC ./src/tutil.c) LIST(APPEND SRC ./src/tutil.c)
LIST(APPEND SRC ./src/version.c) LIST(APPEND SRC ./src/version.c)
ADD_LIBRARY(tutil ${SRC}) ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32) TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32 lz4)
ELSEIF(TD_DARWIN_64) ELSEIF(TD_DARWIN_64)
ADD_DEFINITIONS(-DUSE_LIBICONV) ADD_DEFINITIONS(-DUSE_LIBICONV)
LIST(APPEND SRC ./src/hash.c) LIST(APPEND SRC ./src/hash.c)
...@@ -105,7 +105,7 @@ ELSEIF(TD_DARWIN_64) ...@@ -105,7 +105,7 @@ ELSEIF(TD_DARWIN_64)
LIST(APPEND SRC ./src/version.c) LIST(APPEND SRC ./src/version.c)
LIST(APPEND SRC ./src/hash.c) LIST(APPEND SRC ./src/hash.c)
ADD_LIBRARY(tutil ${SRC}) ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil iconv pthread os) TARGET_LINK_LIBRARIES(tutil iconv pthread os lz4)
ENDIF() ENDIF()
# TARGET_LINK_LIBRARIES(tutil mstorage) # TARGET_LINK_LIBRARIES(tutil mstorage)
...@@ -21,7 +21,9 @@ extern "C" { ...@@ -21,7 +21,9 @@ extern "C" {
#endif #endif
#include "taosdef.h" #include "taosdef.h"
#include "tutil.h"
#define COMP_OVERFLOW_BYTES 2
#define BITS_PER_BYTE 8 #define BITS_PER_BYTE 8
// Masks // Masks
#define INT64MASK(_x) ((1ul << _x) - 1) #define INT64MASK(_x) ((1ul << _x) - 1)
...@@ -32,43 +34,220 @@ extern "C" { ...@@ -32,43 +34,220 @@ extern "C" {
#define ONE_STAGE_COMP 1 #define ONE_STAGE_COMP 1
#define TWO_STAGE_COMP 2 #define TWO_STAGE_COMP 2
int tsCompressTinyint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorithm, extern int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type);
char* const buffer, int bufferSize); extern int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type);
int tsCompressSmallint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, extern int tsCompressBoolImp(const char *const input, const int nelements, char *const output);
char* const buffer, int bufferSize); extern int tsDecompressBoolImp(const char *const input, const int nelements, char *const output);
int tsCompressInt(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, extern int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize);
char* const buffer, int bufferSize); extern int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize);
int tsCompressBigint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, extern int tsCompressTimestampImp(const char *const input, const int nelements, char *const output);
char* const buffer, int bufferSize); extern int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output);
int tsCompressBool(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorithm, extern int tsCompressDoubleImp(const char *const input, const int nelements, char *const output);
char* const buffer, int bufferSize); extern int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output);
int tsCompressString(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, extern int tsCompressFloatImp(const char *const input, const int nelements, char *const output);
char* const buffer, int bufferSize); extern int tsDecompressFloatImp(const char *const input, const int nelements, char *const output);
int tsCompressFloat(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith,
char* const buffer, int bufferSize); static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
int tsCompressDouble(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, char *const buffer, int bufferSize) {
char* const buffer, int bufferSize); if (algorithm == ONE_STAGE_COMP) {
int tsCompressTimestamp(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
char* const buffer, int bufferSize); } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT);
int tsDecompressTinyint(const char* const input, int compressedSize, const int nelements, char* const output, return tsCompressStringImp(buffer, len, output, outputSize);
int outputSize, char algorithm, char* const buffer, int bufferSize); } else {
int tsDecompressSmallint(const char* const input, int compressedSize, const int nelements, char* const output, assert(0);
int outputSize, char algorithm, char* const buffer, int bufferSize); }
int tsDecompressInt(const char* const input, int compressedSize, const int nelements, char* const output, int outputSize, }
char algorithm, char* const buffer, int bufferSize);
int tsDecompressBigint(const char* const input, int compressedSize, const int nelements, char* const output, static FORCE_INLINE int tsDecompressTinyint(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char* const buffer, int bufferSize); int outputSize, char algorithm, char *const buffer, int bufferSize) {
int tsDecompressBool(const char* const input, int compressedSize, const int nelements, char* const output, if (algorithm == ONE_STAGE_COMP) {
int outputSize, char algorithm, char* const buffer, int bufferSize); return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
int tsDecompressString(const char* const input, int compressedSize, const int nelements, char* const output, } else if (algorithm == TWO_STAGE_COMP) {
int outputSize, char algorithm, char* const buffer, int bufferSize); tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
int tsDecompressFloat(const char* const input, int compressedSize, const int nelements, char* const output, return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT);
int outputSize, char algorithm, char* const buffer, int bufferSize); } else {
int tsDecompressDouble(const char* const input, int compressedSize, const int nelements, char* const output, assert(0);
int outputSize, char algorith, char* const buffer, int bufferSize); }
int tsDecompressTimestamp(const char* const input, int compressedSize, const int nelements, char* const output, }
int outputSize, char algorithm, char* const buffer, int bufferSize);
static FORCE_INLINE int tsCompressSmallint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
static FORCE_INLINE int tsDecompressSmallint(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else {
assert(0);
}
}
static FORCE_INLINE int tsCompressInt(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
static FORCE_INLINE int tsDecompressInt(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT);
} else {
assert(0);
}
}
static FORCE_INLINE int tsCompressBigint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
static FORCE_INLINE int tsDecompressBigint(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else {
assert(0);
}
}
static FORCE_INLINE int tsCompressBool(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressBoolImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
static FORCE_INLINE int tsDecompressBool(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressBoolImp(buffer, nelements, output);
} else {
assert(0);
}
}
static FORCE_INLINE int tsCompressString(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
return tsCompressStringImp(input, inputSize, output, outputSize);
}
static FORCE_INLINE int tsDecompressString(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
return tsDecompressStringImp(input, compressedSize, output, outputSize);
}
static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressFloatImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
static FORCE_INLINE int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressFloatImp(buffer, nelements, output);
} else {
assert(0);
}
}
static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressDoubleImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
static FORCE_INLINE int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressDoubleImp(buffer, nelements, output);
} else {
assert(0);
}
}
static FORCE_INLINE int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressTimestampImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
static FORCE_INLINE int tsDecompressTimestamp(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressTimestampImp(buffer, nelements, output);
} else {
assert(0);
}
}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -44,7 +44,10 @@ extern "C" { ...@@ -44,7 +44,10 @@ extern "C" {
#define tclose(x) taosCloseSocket(x) #define tclose(x) taosCloseSocket(x)
#ifdef ASSERTION // Pointer p drift right by b bytes
#define POINTER_DRIFT(p, b) ((void *)((char *)(p) + (b)))
#ifndef NDEBUG
#define ASSERT(x) assert(x) #define ASSERT(x) assert(x)
#else #else
#define ASSERT(x) #define ASSERT(x)
......
...@@ -56,223 +56,6 @@ const int TEST_NUMBER = 1; ...@@ -56,223 +56,6 @@ const int TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
#define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951L) #define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951L)
// Function declarations
int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type);
int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type);
int tsCompressBoolImp(const char *const input, const int nelements, char *const output);
int tsDecompressBoolImp(const char *const input, const int nelements, char *const output);
int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize);
int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize);
int tsCompressTimestampImp(const char *const input, const int nelements, char *const output);
int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output);
int tsCompressDoubleImp(const char *const input, const int nelements, char *const output);
int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output);
int tsCompressFloatImp(const char *const input, const int nelements, char *const output);
int tsDecompressFloatImp(const char *const input, const int nelements, char *const output);
/* ----------------------------------------------Compression function used by
* others ---------------------------------------------- */
int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressTinyint(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else {
assert(0);
}
}
int tsCompressSmallint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressSmallint(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else {
assert(0);
}
}
int tsCompressInt(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressInt(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT);
} else {
assert(0);
}
}
int tsCompressBigint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressBigint(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else {
assert(0);
}
}
int tsCompressBool(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressBoolImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressBool(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressBoolImp(buffer, nelements, output);
} else {
assert(0);
}
}
int tsCompressString(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
return tsCompressStringImp(input, inputSize, output, outputSize);
}
int tsDecompressString(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
return tsDecompressStringImp(input, compressedSize, output, outputSize);
}
int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressFloatImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressFloatImp(buffer, nelements, output);
} else {
assert(0);
}
}
int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressDoubleImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressDoubleImp(buffer, nelements, output);
} else {
assert(0);
}
}
int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressTimestampImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
}
}
int tsDecompressTimestamp(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
return tsDecompressTimestampImp(buffer, nelements, output);
} else {
assert(0);
}
}
bool safeInt64Add(int64_t a, int64_t b) { bool safeInt64Add(int64_t a, int64_t b) {
if ((a > 0 && b > INT64_MAX - a) || (a < 0 && b < INT64_MIN - a)) return false; if ((a > 0 && b > INT64_MAX - a) || (a < 0 && b < INT64_MIN - a)) return false;
return true; return true;
......
...@@ -96,14 +96,16 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -96,14 +96,16 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
} }
STsdbCfg tsdbCfg = {0}; STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize;
tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables; tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
......
...@@ -123,7 +123,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -123,7 +123,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
STSchema *pDestSchema = tdNewSchema(numOfColumns); STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) { for (int i = 0; i < numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
} }
tsdbTableSetSchema(&tCfg, pDestSchema, false); tsdbTableSetSchema(&tCfg, pDestSchema, false);
tsdbTableSetName(&tCfg, pTable->tableId, false); tsdbTableSetName(&tCfg, pTable->tableId, false);
...@@ -131,7 +131,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -131,7 +131,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
if (numOfTags != 0) { if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags); STSchema *pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) { for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
} }
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
tsdbTableSetSName(&tCfg, pTable->superTableId, false); tsdbTableSetSName(&tCfg, pTable->superTableId, false);
...@@ -141,7 +141,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -141,7 +141,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < numOfTags; i++) { for (int i = 0; i < numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); STColumn *pTCol = schemaColAt(pDestSchema, i);
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
accumBytes += htons(pSchema[i + numOfColumns].bytes); accumBytes += htons(pSchema[i + numOfColumns].bytes);
} }
tsdbTableSetTagValue(&tCfg, dataRow, false); tsdbTableSetTagValue(&tCfg, dataRow, false);
...@@ -188,14 +189,14 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -188,14 +189,14 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
STSchema *pDestSchema = tdNewSchema(numOfColumns); STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) { for (int i = 0; i < numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
} }
tsdbTableSetSchema(&tCfg, pDestSchema, false); tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (numOfTags != 0) { if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags); STSchema *pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) { for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
} }
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
...@@ -204,7 +205,8 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -204,7 +205,8 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < numOfTags; i++) { for (int i = 0; i < numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); STColumn *pTCol = schemaColAt(pDestTagSchema, i);
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
accumBytes += htons(pSchema[i + numOfColumns].bytes); accumBytes += htons(pSchema[i + numOfColumns].bytes);
} }
tsdbTableSetTagValue(&tCfg, dataRow, false); tsdbTableSetTagValue(&tCfg, dataRow, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册