未验证 提交 856c990f 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #17102 from taosdata/feat/stream_compression

feat: stream compression
......@@ -162,17 +162,7 @@ struct STSRowBuilder {
struct SValue {
union {
int8_t i8; // TSDB_DATA_TYPE_BOOL||TSDB_DATA_TYPE_TINYINT
uint8_t u8; // TSDB_DATA_TYPE_UTINYINT
int16_t i16; // TSDB_DATA_TYPE_SMALLINT
uint16_t u16; // TSDB_DATA_TYPE_USMALLINT
int32_t i32; // TSDB_DATA_TYPE_INT
uint32_t u32; // TSDB_DATA_TYPE_UINT
int64_t i64; // TSDB_DATA_TYPE_BIGINT
uint64_t u64; // TSDB_DATA_TYPE_UBIGINT
TSKEY ts; // TSDB_DATA_TYPE_TIMESTAMP
float f; // TSDB_DATA_TYPE_FLOAT
double d; // TSDB_DATA_TYPE_DOUBLE
int64_t val;
struct {
uint32_t nData;
uint8_t *pData;
......
......@@ -333,10 +333,10 @@ typedef struct tDataTypeDescriptor {
char *name;
int64_t minValue;
int64_t maxValue;
int32_t (*compFunc)(const char *const input, int32_t inputSize, const int32_t nelements, char *const output,
int32_t outputSize, char algorithm, char *const buffer, int32_t bufferSize);
int32_t (*decompFunc)(const char *const input, int32_t compressedSize, const int32_t nelements, char *const output,
int32_t outputSize, char algorithm, char *const buffer, int32_t bufferSize);
int32_t (*compFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t (*decompFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
void (*statisFunc)(int8_t bitmapMode, const void *pBitmap, const void *pData, int32_t numofrow, int64_t *min,
int64_t *max, int64_t *sum, int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
} tDataTypeDescriptor;
......@@ -356,7 +356,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
void *getDataMin(int32_t type);
void *getDataMax(int32_t type);
#ifdef __cplusplus
}
#endif
......
......@@ -23,8 +23,8 @@ extern "C" {
#endif
#define ENCODE_LIMIT (((uint8_t)1) << 7)
#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode
#define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode
#define ZIGZAGE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode
#define ZIGZAGD(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode
/* ------------------------ LEGACY CODES ------------------------ */
#if 1
......@@ -70,7 +70,7 @@ static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) {
}
static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) {
*value = ( (((int8_t *)buf)[0] == 0) ? false : true );
*value = ((((int8_t *)buf)[0] == 0) ? false : true);
return POINTER_SHIFT(buf, sizeof(int8_t));
}
......
......@@ -51,287 +51,12 @@ extern "C" {
#define HEAD_MODE(x) x % 2
#define HEAD_ALGO(x) x / 2
extern int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type);
extern int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output,
const char type);
extern int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsCompressStringImp(const char *const input, int32_t inputSize, char *const output, int32_t outputSize);
extern int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, char *const output,
int32_t outputSize);
extern int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output);
// lossy
extern int32_t tsCompressFloatLossyImp(const char *input, const int32_t nelements, char *const output);
extern int32_t tsDecompressFloatLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
char *const output);
extern int32_t tsCompressDoubleLossyImp(const char *input, const int32_t nelements, char *const output);
extern int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
char *const output);
#ifdef TD_TSZ
extern bool lossyFloat;
extern bool lossyDouble;
int32_t tsCompressInit();
void tsCompressExit();
#endif
static FORCE_INLINE int32_t tsCompressTinyint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressTinyint(const char *const input, int32_t compressedSize,
const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressSmallint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressSmallint(const char *const input, int32_t compressedSize,
const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressInt(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressInt(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressBigint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressBigint(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressBool(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressBoolImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressBool(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressBoolImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressString(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
return tsCompressStringImp(input, inputSize, output, outputSize);
}
static FORCE_INLINE int32_t tsDecompressString(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
return tsDecompressStringImp(input, compressedSize, output, outputSize);
}
static FORCE_INLINE int32_t tsCompressFloat(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
#ifdef TD_TSZ
// lossy mode
if (lossyFloat) {
return tsCompressFloatLossyImp(input, nelements, output);
// lossless mode
} else {
#endif
if (algorithm == ONE_STAGE_COMP) {
return tsCompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressFloatImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
static FORCE_INLINE int32_t tsDecompressFloat(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
#ifdef TD_TSZ
if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) {
// decompress lossy
return tsDecompressFloatLossyImp(input, compressedSize, nelements, output);
} else {
#endif
// decompress lossless
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressFloatImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
static FORCE_INLINE int32_t tsCompressDouble(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
#ifdef TD_TSZ
if (lossyDouble) {
// lossy mode
return tsCompressDoubleLossyImp(input, nelements, output);
} else {
#endif
// lossless mode
if (algorithm == ONE_STAGE_COMP) {
return tsCompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressDoubleImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
static FORCE_INLINE int32_t tsDecompressDouble(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
#ifdef TD_TSZ
if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) {
// decompress lossy
return tsDecompressDoubleLossyImp(input, compressedSize, nelements, output);
} else {
#endif
// decompress lossless
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressDoubleImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
#ifdef TD_TSZ
//
// lossy float double
//
static FORCE_INLINE int32_t tsCompressFloatLossy(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
......@@ -358,33 +83,56 @@ static FORCE_INLINE int32_t tsDecompressDoubleLossy(const char *const input, int
#endif
static FORCE_INLINE int32_t tsCompressTimestamp(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressTimestampImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressTimestamp(const char *const input, int32_t compressedSize,
const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressTimestampImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
}
/*************************************************************************
* REGULAR COMPRESSION
*************************************************************************/
int32_t tsCompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsCompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
/*************************************************************************
* STREAM COMPRESSION
*************************************************************************/
typedef struct SCompressor SCompressor;
int32_t tCompressorCreate(SCompressor **ppCmprsor);
int32_t tCompressorDestroy(SCompressor *pCmprsor);
int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin);
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData);
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -689,7 +689,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData);
val = varBuf;
} else {
val = (const void *)&pColVal->value.i64;
val = (const void *)&pColVal->value.val;
}
} else {
pColVal = NULL;
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include <gtest/gtest.h>
#include <taoserror.h>
......@@ -476,4 +477,5 @@ TEST(testCase, NoneTest) {
taosArrayDestroy(pArray);
taosMemoryFree(pTSchema);
}
#endif
#endif
\ No newline at end of file
......@@ -38,39 +38,43 @@ extern "C" {
goto LABEL; \
}
typedef struct TSDBROW TSDBROW;
typedef struct TABLEID TABLEID;
typedef struct TSDBKEY TSDBKEY;
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx;
typedef struct SDataBlk SDataBlk;
typedef struct SSttBlk SSttBlk;
typedef struct SDiskDataHdr SDiskDataHdr;
typedef struct SBlockData SBlockData;
typedef struct SDelFile SDelFile;
typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile;
typedef struct SSttFile SSttFile;
typedef struct SSmaFile SSmaFile;
typedef struct SDFileSet SDFileSet;
typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter;
typedef struct TSDBROW TSDBROW;
typedef struct TABLEID TABLEID;
typedef struct TSDBKEY TSDBKEY;
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx;
typedef struct SDataBlk SDataBlk;
typedef struct SSttBlk SSttBlk;
typedef struct SDiskDataHdr SDiskDataHdr;
typedef struct SBlockData SBlockData;
typedef struct SDelFile SDelFile;
typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile;
typedef struct SSttFile SSttFile;
typedef struct SSmaFile SSmaFile;
typedef struct SDFileSet SDFileSet;
typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SBlkInfo SBlkInfo;
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8
......@@ -170,7 +174,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut
int32_t aBufN[]);
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
// SDiskDataHdr
int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr);
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
// SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph);
......@@ -267,6 +271,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast);
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
// SDataFReader
......@@ -300,7 +305,7 @@ int32_t tsdbMerge(STsdb *pTsdb);
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbCache
// tsdbCache ==============================================================================================
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
......@@ -318,6 +323,15 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder);
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg,
uint8_t calcSma);
int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder);
int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId);
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo);
// structs =======================
struct STsdbFS {
SDelFile *pDelFile;
......@@ -438,6 +452,17 @@ struct SSmaInfo {
int32_t size;
};
struct SBlkInfo {
int64_t minUid;
int64_t maxUid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVer;
int64_t maxVer;
TSDBKEY minTKey;
TSDBKEY maxTKey;
};
struct SDataBlk {
TSDBKEY minKey;
TSDBKEY maxKey;
......@@ -661,6 +686,38 @@ typedef struct {
STSchema *pTSchema;
} SSkmInfo;
struct SDiskCol {
SBlockCol bCol;
const uint8_t *pBit;
const uint8_t *pOff;
const uint8_t *pVal;
SColumnDataAgg agg;
};
struct SDiskData {
SDiskDataHdr hdr;
const uint8_t *pUid;
const uint8_t *pVer;
const uint8_t *pKey;
SArray *aDiskCol; // SArray<SDiskCol>
};
struct SDiskDataBuilder {
int64_t suid;
int64_t uid;
int32_t nRow;
uint8_t cmprAlg;
uint8_t calcSma;
SCompressor *pUidC;
SCompressor *pVerC;
SCompressor *pKeyC;
int32_t nBuilder;
SArray *aBuilder; // SArray<SDiskColBuilder>
uint8_t *aBuf[2];
SDiskData dd;
SBlkInfo bi;
};
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr);
......
......@@ -260,7 +260,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs > tTsVal->ts) {
STColumn *pTColumn = &pTSchema->columns[0];
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = keyTs});
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
}
......@@ -1052,7 +1052,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
lastRowTs = TSDBROW_TS(pRow);
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs});
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -1151,7 +1151,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
lastRowTs = rowTs;
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs});
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......
......@@ -128,7 +128,7 @@ _exit:
return code;
}
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) {
int32_t code = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
......@@ -522,9 +522,6 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
// write
if (pSmaInfo->size) {
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
if (code) goto _err;
......@@ -607,6 +604,132 @@ _err:
return code;
}
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo) {
int32_t code = 0;
int32_t lino = 0;
STsdbFD *pFD = NULL;
if (pSmaInfo) {
pFD = pWriter->pDataFD;
pBlkInfo->offset = pWriter->fData.size;
} else {
pFD = pWriter->pSttFD;
pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size;
}
pBlkInfo->szBlock = 0;
pBlkInfo->szKey = 0;
// hdr
int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr);
code = tRealloc(&pWriter->aBuf[0], n);
TSDB_CHECK_CODE(code, lino, _exit);
tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr);
code = tsdbWriteFile(pFD, pBlkInfo->offset, pWriter->aBuf[0], n);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += n;
pBlkInfo->szBlock += n;
// uid + ver + key
if (pDiskData->pUid) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pUid, pDiskData->hdr.szUid);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += pDiskData->hdr.szUid;
pBlkInfo->szBlock += pDiskData->hdr.szUid;
}
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pVer, pDiskData->hdr.szVer);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += pDiskData->hdr.szVer;
pBlkInfo->szBlock += pDiskData->hdr.szVer;
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pKey, pDiskData->hdr.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += pDiskData->hdr.szKey;
pBlkInfo->szBlock += pDiskData->hdr.szKey;
// aBlockCol
if (pDiskData->hdr.szBlkCol) {
code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
n = 0;
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
n += tPutBlockCol(pWriter->aBuf[0] + n, pDiskCol);
}
ASSERT(n == pDiskData->hdr.szBlkCol);
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskData->hdr.szBlkCol;
}
// aDiskCol
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
if (pDiskCol->pBit) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pBit, pDiskCol->bCol.szBitmap);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskCol->bCol.szBitmap;
}
if (pDiskCol->pOff) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pOff, pDiskCol->bCol.szOffset);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskCol->bCol.szOffset;
}
if (pDiskCol->pVal) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pVal, pDiskCol->bCol.szValue);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskCol->bCol.szValue;
}
}
if (pSmaInfo) {
pWriter->fData.size += pBlkInfo->szBlock;
} else {
pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock;
goto _exit;
}
pSmaInfo->offset = 0;
pSmaInfo->size = 0;
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
if (IS_VAR_DATA_TYPE(pDiskCol->bCol.type)) continue;
if (pDiskCol->bCol.flag == HAS_NULL || pDiskCol->bCol.flag == (HAS_NULL | HAS_NONE)) continue;
if (!pDiskCol->bCol.smaOn) continue;
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &pDiskCol->agg));
TSDB_CHECK_CODE(code, lino, _exit);
pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &pDiskCol->agg);
}
if (pSmaInfo->size) {
pSmaInfo->offset = pWriter->fSma.size;
code = tsdbWriteFile(pWriter->pSmaFD, pSmaInfo->offset, pWriter->aBuf[0], pSmaInfo->size);
TSDB_CHECK_CODE(code, lino, _exit);
pWriter->fSma.size += pSmaInfo->size;
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
int32_t code = 0;
int64_t n;
......@@ -946,8 +1069,8 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
ASSERT(pBlockData->suid == hdr.suid);
ASSERT(pBlockData->uid == hdr.uid);
pBlockData->uid = hdr.uid;
pBlockData->nRow = hdr.nRow;
// uid
......
......@@ -649,7 +649,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts});
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......@@ -690,7 +690,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
STColumn *pTColumn;
int32_t iCol, jCol = 1;
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
pTColumn = &pMerger->pTSchema->columns[iCol];
......@@ -744,7 +744,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts});
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......@@ -770,7 +770,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal);
......@@ -1105,9 +1105,8 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
pColVal = tRowIterNext(&rIter);
}
}
_exit:
pBlockData->nRow++;
return code;
_err:
......@@ -1455,9 +1454,8 @@ _exit:
}
// SDiskDataHdr ==============================
int32_t tPutDiskDataHdr(uint8_t *p, void *ph) {
int32_t n = 0;
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) {
int32_t n = 0;
n += tPutU32(p ? p + n : p, pHdr->delimiter);
n += tPutU32v(p ? p + n : p, pHdr->fmtVer);
......@@ -1516,174 +1514,107 @@ int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) {
return n;
}
#define SMA_UPDATE(SUM_V, MIN_V, MAX_V, VAL, MINSET, MAXSET) \
do { \
(SUM_V) += (VAL); \
if (!(MINSET)) { \
(MIN_V) = (VAL); \
(MINSET) = 1; \
} else if ((MIN_V) > (VAL)) { \
(MIN_V) = (VAL); \
} \
if (!(MAXSET)) { \
(MAX_V) = (VAL); \
(MAXSET) = 1; \
} else if ((MAX_V) < (VAL)) { \
(MAX_V) = (VAL); \
} \
} while (0)
static FORCE_INLINE void tSmaUpdateBool(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
int8_t val = *(int8_t *)&pColVal->value.val ? 1 : 0;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateTinyint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
int8_t val = *(int8_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateSmallint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
int16_t val = *(int16_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateInt(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
int32_t val = *(int32_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateBigint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
int64_t val = *(int64_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateFloat(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
float val = *(float *)&pColVal->value.val;
SMA_UPDATE(*(double *)&pColAgg->sum, *(double *)&pColAgg->min, *(double *)&pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateDouble(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
double val = *(double *)&pColVal->value.val;
SMA_UPDATE(*(double *)&pColAgg->sum, *(double *)&pColAgg->min, *(double *)&pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUTinyint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
uint8_t val = *(uint8_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUSmallint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
uint16_t val = *(uint16_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUInt(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
uint32_t val = *(uint32_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUBigint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
uint64_t val = *(uint64_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
void (*tSmaUpdateImpl[])(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) = {
NULL,
tSmaUpdateBool, // TSDB_DATA_TYPE_BOOL
tSmaUpdateTinyint, // TSDB_DATA_TYPE_TINYINT
tSmaUpdateSmallint, // TSDB_DATA_TYPE_SMALLINT
tSmaUpdateInt, // TSDB_DATA_TYPE_INT
tSmaUpdateBigint, // TSDB_DATA_TYPE_BIGINT
tSmaUpdateFloat, // TSDB_DATA_TYPE_FLOAT
tSmaUpdateDouble, // TSDB_DATA_TYPE_DOUBLE
NULL, // TSDB_DATA_TYPE_VARCHAR
tSmaUpdateBigint, // TSDB_DATA_TYPE_TIMESTAMP
NULL, // TSDB_DATA_TYPE_NCHAR
tSmaUpdateUTinyint, // TSDB_DATA_TYPE_UTINYINT
tSmaUpdateUSmallint, // TSDB_DATA_TYPE_USMALLINT
tSmaUpdateUInt, // TSDB_DATA_TYPE_UINT
tSmaUpdateUBigint, // TSDB_DATA_TYPE_UBIGINT
NULL, // TSDB_DATA_TYPE_JSON
NULL, // TSDB_DATA_TYPE_VARBINARY
NULL, // TSDB_DATA_TYPE_DECIMAL
NULL, // TSDB_DATA_TYPE_BLOB
NULL, // TSDB_DATA_TYPE_MEDIUMBLOB
};
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal;
SColVal *pColVal = &colVal;
memset(pColAgg, 0, sizeof(*pColAgg));
bool minAssigned = false;
bool maxAssigned = false;
*pColAgg = (SColumnDataAgg){.colId = pColData->cid};
uint8_t minSet = 0;
uint8_t maxSet = 0;
SColVal cv;
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
tColDataGetValue(pColData, iVal, pColVal);
tColDataGetValue(pColData, iVal, &cv);
if (!COL_VAL_IS_VALUE(pColVal)) {
pColAgg->numOfNull++;
if (COL_VAL_IS_VALUE(&cv)) {
tSmaUpdateImpl[pColData->type](pColAgg, &cv, &minSet, &maxSet);
} else {
switch (pColData->type) {
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
break;
case TSDB_DATA_TYPE_TINYINT: {
pColAgg->sum += colVal.value.i8;
if (!minAssigned || pColAgg->min > colVal.value.i8) {
pColAgg->min = colVal.value.i8;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i8) {
pColAgg->max = colVal.value.i8;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
pColAgg->sum += colVal.value.i16;
if (!minAssigned || pColAgg->min > colVal.value.i16) {
pColAgg->min = colVal.value.i16;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i16) {
pColAgg->max = colVal.value.i16;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32;
if (!minAssigned || pColAgg->min > colVal.value.i32) {
pColAgg->min = colVal.value.i32;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i32) {
pColAgg->max = colVal.value.i32;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
pColAgg->sum += colVal.value.i64;
if (!minAssigned || pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(double *)(&pColAgg->sum) += colVal.value.f;
if (!minAssigned || *(double *)(&pColAgg->min) > colVal.value.f) {
*(double *)(&pColAgg->min) = colVal.value.f;
minAssigned = true;
}
if (!maxAssigned || *(double *)(&pColAgg->max) < colVal.value.f) {
*(double *)(&pColAgg->max) = colVal.value.f;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
*(double *)(&pColAgg->sum) += colVal.value.d;
if (!minAssigned || *(double *)(&pColAgg->min) > colVal.value.d) {
*(double *)(&pColAgg->min) = colVal.value.d;
minAssigned = true;
}
if (!maxAssigned || *(double *)(&pColAgg->max) < colVal.value.d) {
*(double *)(&pColAgg->max) = colVal.value.d;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_VARCHAR:
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (!minAssigned || pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
break;
case TSDB_DATA_TYPE_UTINYINT: {
pColAgg->sum += colVal.value.u8;
if (!minAssigned || pColAgg->min > colVal.value.u8) {
pColAgg->min = colVal.value.u8;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.u8) {
pColAgg->max = colVal.value.u8;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
pColAgg->sum += colVal.value.u16;
if (!minAssigned || pColAgg->min > colVal.value.u16) {
pColAgg->min = colVal.value.u16;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.u16) {
pColAgg->max = colVal.value.u16;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_UINT: {
pColAgg->sum += colVal.value.u32;
if (!minAssigned || pColAgg->min > colVal.value.u32) {
pColAgg->min = colVal.value.u32;
minAssigned = true;
}
if (!minAssigned || pColAgg->max < colVal.value.u32) {
pColAgg->max = colVal.value.u32;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
pColAgg->sum += colVal.value.u64;
if (!minAssigned || pColAgg->min > colVal.value.u64) {
pColAgg->min = colVal.value.u64;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.u64) {
pColAgg->max = colVal.value.u64;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_JSON:
break;
case TSDB_DATA_TYPE_VARBINARY:
break;
case TSDB_DATA_TYPE_DECIMAL:
break;
case TSDB_DATA_TYPE_BLOB:
break;
case TSDB_DATA_TYPE_MEDIUMBLOB:
break;
default:
ASSERT(0);
}
pColAgg->numOfNull++;
}
}
}
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册