未验证 提交 4fc364a1 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #6797 from taosdata/compress_float

Compress float
...@@ -13,3 +13,6 @@ ...@@ -13,3 +13,6 @@
[submodule "deps/jemalloc"] [submodule "deps/jemalloc"]
path = deps/jemalloc path = deps/jemalloc
url = https://github.com/jemalloc/jemalloc url = https://github.com/jemalloc/jemalloc
[submodule "deps/TSZ"]
path = deps/TSZ
url = https://github.com/taosdata/TSZ.git
...@@ -91,3 +91,12 @@ SET(TD_MEMORY_SANITIZER FALSE) ...@@ -91,3 +91,12 @@ SET(TD_MEMORY_SANITIZER FALSE)
IF (${MEMORY_SANITIZER} MATCHES "true") IF (${MEMORY_SANITIZER} MATCHES "true")
SET(TD_MEMORY_SANITIZER TRUE) SET(TD_MEMORY_SANITIZER TRUE)
ENDIF () ENDIF ()
IF (${TSZ_ENABLED} MATCHES "true")
# define add
MESSAGE(STATUS "build with TSZ enabled")
ADD_DEFINITIONS(-DTD_TSZ)
set(VAR_TSZ "TSZ" CACHE INTERNAL "global variant tsz" )
ELSE()
set(VAR_TSZ "" CACHE INTERNAL "global variant empty" )
ENDIF()
...@@ -15,7 +15,6 @@ ADD_SUBDIRECTORY(cJson) ...@@ -15,7 +15,6 @@ ADD_SUBDIRECTORY(cJson)
ADD_SUBDIRECTORY(wepoll) ADD_SUBDIRECTORY(wepoll)
ADD_SUBDIRECTORY(MsvcLibX) ADD_SUBDIRECTORY(MsvcLibX)
ADD_SUBDIRECTORY(rmonotonic) ADD_SUBDIRECTORY(rmonotonic)
ADD_SUBDIRECTORY(lua) ADD_SUBDIRECTORY(lua)
IF (TD_LINUX AND TD_MQTT) IF (TD_LINUX AND TD_MQTT)
...@@ -38,3 +37,7 @@ IF (TD_LINUX_64 AND JEMALLOC_ENABLED) ...@@ -38,3 +37,7 @@ IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
BUILD_COMMAND ${MAKE} BUILD_COMMAND ${MAKE}
) )
ENDIF () ENDIF ()
IF (${TSZ_ENABLED} MATCHES "true")
ADD_SUBDIRECTORY(TSZ)
ENDIF()
\ No newline at end of file
Subproject commit 0ca5b15a8eac40327dd737be52c926fa5675712c
...@@ -11,7 +11,7 @@ IF (TD_LINUX) ...@@ -11,7 +11,7 @@ IF (TD_LINUX)
# set the static lib name # set the static lib name
ADD_LIBRARY(taos_static STATIC ${SRC}) ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m rt) TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m rt ${VAR_TSZ})
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static") SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1) SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1)
......
...@@ -205,6 +205,16 @@ extern int32_t wDebugFlag; ...@@ -205,6 +205,16 @@ extern int32_t wDebugFlag;
extern int32_t cqDebugFlag; extern int32_t cqDebugFlag;
extern int32_t debugFlag; extern int32_t debugFlag;
#ifdef TD_TSZ
// lossy
extern char lossyColumns[];
extern double fPrecision;
extern double dPrecision;
extern uint32_t maxIntervals;
extern uint32_t intervals;
extern char Compressor[];
#endif
typedef struct { typedef struct {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
int level; int level;
......
...@@ -244,6 +244,19 @@ int32_t tsdbDebugFlag = 131; ...@@ -244,6 +244,19 @@ int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
#ifdef TD_TSZ
//
// lossy compress 6
//
char lossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty can close lossy compress.
// below option can take effect when tsLossyColumns not empty
double fPrecision = 1E-8; // float column precision
double dPrecision = 1E-16; // double column precision
uint32_t maxIntervals = 500; // max intervals
uint32_t intervals = 100; // intervals
char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
#endif
int32_t (*monStartSystemFp)() = NULL; int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL; void (*monStopSystemFp)() = NULL;
void (*monExecuteSQLFp)(char *sql) = NULL; void (*monExecuteSQLFp)(char *sql) = NULL;
...@@ -1517,6 +1530,62 @@ static void doInitGlobalConfig(void) { ...@@ -1517,6 +1530,62 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = tListLen(tsTempDir); cfg.ptrLength = tListLen(tsTempDir);
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
cfg.ptr = lossyColumns;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(lossyColumns);
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "fPrecision";
cfg.ptr = &fPrecision;
cfg.valType = TAOS_CFG_VTYPE_DOUBLE;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = MIN_FLOAT;
cfg.maxValue = MAX_FLOAT;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "dPrecision";
cfg.ptr = &dPrecision;
cfg.valType = TAOS_CFG_VTYPE_DOUBLE;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = MIN_FLOAT;
cfg.maxValue = MAX_FLOAT;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "maxIntervals";
cfg.ptr = &maxIntervals;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 65536;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "intervals";
cfg.ptr = &intervals;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 65536;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#endif
} }
void taosInitGlobalCfg() { void taosInitGlobalCfg() {
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include "module.h" #include "module.h"
#include "qScript.h" #include "qScript.h"
#include "mnode.h" #include "mnode.h"
#include "tscompression.h"
#if !defined(_MODULE) || !defined(_TD_LINUX) #if !defined(_MODULE) || !defined(_TD_LINUX)
int32_t moduleStart() { return 0; } int32_t moduleStart() { return 0; }
...@@ -236,6 +237,12 @@ static void dnodeCheckDataDirOpenned(char *dir) { ...@@ -236,6 +237,12 @@ static void dnodeCheckDataDirOpenned(char *dir) {
} }
static int32_t dnodeInitStorage() { static int32_t dnodeInitStorage() {
#ifdef TD_TSZ
// compress module init
tsCompressInit();
#endif
// storage module init
if (tsDiskCfgNum == 1 && dnodeCreateDir(tsDataDir) < 0) { if (tsDiskCfgNum == 1 && dnodeCreateDir(tsDataDir) < 0) {
dError("failed to create dir: %s, reason: %s", tsDataDir, strerror(errno)); dError("failed to create dir: %s, reason: %s", tsDataDir, strerror(errno));
return -1; return -1;
...@@ -311,7 +318,15 @@ static int32_t dnodeInitStorage() { ...@@ -311,7 +318,15 @@ static int32_t dnodeInitStorage() {
return 0; return 0;
} }
static void dnodeCleanupStorage() { tfsDestroy(); } static void dnodeCleanupStorage() {
// storage destroy
tfsDestroy();
#ifdef TD_TSZ
// compress destroy
tsCompressExit();
#endif
}
bool dnodeIsFirstDeploy() { bool dnodeIsFirstDeploy() {
return strcmp(tsFirst, tsLocalEp) == 0; return strcmp(tsFirst, tsLocalEp) == 0;
......
...@@ -4,3 +4,4 @@ PROJECT(TDengine) ...@@ -4,3 +4,4 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(shell) ADD_SUBDIRECTORY(shell)
ADD_SUBDIRECTORY(taosdemo) ADD_SUBDIRECTORY(taosdemo)
ADD_SUBDIRECTORY(taosdump) ADD_SUBDIRECTORY(taosdump)
ADD_SUBDIRECTORY(taospack)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taospack ${SRC})
TARGET_LINK_LIBRARIES(taospack os tutil tsdb ${VAR_TSZ})
ELSEIF (TD_WINDOWS)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taospack ${SRC})
TARGET_LINK_LIBRARIES(taospack)
ELSEIF (TD_DARWIN)
# MAC
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taospack ${SRC})
TARGET_LINK_LIBRARIES(taospack os tutil tsdb)
ENDIF ()
\ No newline at end of file
此差异已折叠。
...@@ -4,10 +4,14 @@ PROJECT(TDengine) ...@@ -4,10 +4,14 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/rmonotonic/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/rmonotonic/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/TSZ/sz/include)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tutil ${SRC}) ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil pthread os lz4 z rmonotonic)
TARGET_LINK_LIBRARIES(tutil pthread os lz4 z rmonotonic ${VAR_TSZ} )
IF (TD_LINUX) IF (TD_LINUX)
TARGET_LINK_LIBRARIES(tutil m rt) TARGET_LINK_LIBRARIES(tutil m rt)
...@@ -37,4 +41,4 @@ ENDIF() ...@@ -37,4 +41,4 @@ ENDIF()
IF (TD_STORAGE) IF (TD_STORAGE)
TARGET_LINK_LIBRARIES(tutil storage) TARGET_LINK_LIBRARIES(tutil storage)
ENDIF () ENDIF ()
\ No newline at end of file
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 110 #define TSDB_CFG_MAX_NUM 116 // 110 + 6 with lossy option
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
...@@ -32,6 +32,9 @@ extern "C" { ...@@ -32,6 +32,9 @@ extern "C" {
#define TSDB_CFG_CTYPE_B_OPTION 16U // can be configured by taos_options function #define TSDB_CFG_CTYPE_B_OPTION 16U // can be configured by taos_options function
#define TSDB_CFG_CTYPE_B_NOT_PRINT 32U // such as password #define TSDB_CFG_CTYPE_B_NOT_PRINT 32U // such as password
#define MAX_FLOAT 100000
#define MIN_FLOAT 0
enum { enum {
TAOS_CFG_CSTATUS_NONE, // not configured TAOS_CFG_CSTATUS_NONE, // not configured
TAOS_CFG_CSTATUS_DEFAULT, // use system default value TAOS_CFG_CSTATUS_DEFAULT, // use system default value
...@@ -50,6 +53,7 @@ enum { ...@@ -50,6 +53,7 @@ enum {
TAOS_CFG_VTYPE_IPSTR, TAOS_CFG_VTYPE_IPSTR,
TAOS_CFG_VTYPE_DIRECTORY, TAOS_CFG_VTYPE_DIRECTORY,
TAOS_CFG_VTYPE_DATA_DIRCTORY, TAOS_CFG_VTYPE_DATA_DIRCTORY,
TAOS_CFG_VTYPE_DOUBLE,
}; };
enum { enum {
......
...@@ -34,6 +34,22 @@ extern "C" { ...@@ -34,6 +34,22 @@ extern "C" {
#define ONE_STAGE_COMP 1 #define ONE_STAGE_COMP 1
#define TWO_STAGE_COMP 2 #define TWO_STAGE_COMP 2
//
// compressed data first byte foramt
// ------ 7 bit ---- | ---- 1 bit ----
// algorithm mode
//
// compression data mode save first byte lower 1 bit
#define MODE_NOCOMPRESS 0 // original data
#define MODE_COMPRESS 1 // compatible old compress
// compression algorithm save first byte higher 7 bit
#define ALGO_SZ_LOSSY 1 // SZ compress
#define HEAD_MODE(x) x%2
#define HEAD_ALGO(x) x/2
extern int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type); extern int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type);
extern int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type); extern int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type);
extern int tsCompressBoolImp(const char *const input, const int nelements, char *const output); extern int tsCompressBoolImp(const char *const input, const int nelements, char *const output);
...@@ -46,6 +62,20 @@ extern int tsCompressDoubleImp(const char *const input, const int nelements, cha ...@@ -46,6 +62,20 @@ extern int tsCompressDoubleImp(const char *const input, const int nelements, cha
extern int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output); extern int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output);
extern int tsCompressFloatImp(const char *const input, const int nelements, char *const output); extern int tsCompressFloatImp(const char *const input, const int nelements, char *const output);
extern int tsDecompressFloatImp(const char *const input, const int nelements, char *const output); extern int tsDecompressFloatImp(const char *const input, const int nelements, char *const output);
// lossy
extern int tsCompressFloatLossyImp(const char * input, const int nelements, char *const output);
extern int tsDecompressFloatLossyImp(const char * input, int compressedSize, const int nelements, char *const output);
extern int tsCompressDoubleLossyImp(const char * input, const int nelements, char *const output);
extern int tsDecompressDoubleLossyImp(const char * input, int compressedSize, const int nelements, char *const output);
#ifdef TD_TSZ
extern bool lossyFloat;
extern bool lossyDouble;
// init call
int tsCompressInit();
// exit call
void tsCompressExit();
#endif
static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
char *const buffer, int bufferSize) { char *const buffer, int bufferSize) {
...@@ -189,56 +219,123 @@ static FORCE_INLINE int tsDecompressString(const char *const input, int compress ...@@ -189,56 +219,123 @@ static FORCE_INLINE int tsDecompressString(const char *const input, int compress
static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) { char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) { #ifdef TD_TSZ
return tsCompressFloatImp(input, nelements, output); // lossy mode
} else if (algorithm == TWO_STAGE_COMP) { if(lossyFloat) {
int len = tsCompressFloatImp(input, nelements, buffer); return tsCompressFloatLossyImp(input, nelements, output);
return tsCompressStringImp(buffer, len, output, outputSize); // lossless mode
} else { } else {
assert(0); #endif
return -1; if (algorithm == ONE_STAGE_COMP) {
return tsCompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressFloatImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
} }
#endif
} }
static FORCE_INLINE int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) { int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) { #ifdef TD_TSZ
return tsDecompressFloatImp(input, nelements, output); if(HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY){
} else if (algorithm == TWO_STAGE_COMP) { // decompress lossy
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1; return tsDecompressFloatLossyImp(input, compressedSize, nelements, output);
return tsDecompressFloatImp(buffer, nelements, output);
} else { } else {
assert(0); #endif
return -1; // decompress lossless
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressFloatImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
} }
#endif
} }
static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) { char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) { #ifdef TD_TSZ
return tsCompressDoubleImp(input, nelements, output); if(lossyDouble){
} else if (algorithm == TWO_STAGE_COMP) { // lossy mode
int len = tsCompressDoubleImp(input, nelements, buffer); return tsCompressDoubleLossyImp(input, nelements, output);
return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); #endif
return -1; // lossless mode
if (algorithm == ONE_STAGE_COMP) {
return tsCompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressDoubleImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
} }
#endif
} }
static FORCE_INLINE int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize) { int outputSize, char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) { #ifdef TD_TSZ
return tsDecompressDoubleImp(input, nelements, output); if(HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY){
} else if (algorithm == TWO_STAGE_COMP) { // decompress lossy
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1; return tsDecompressDoubleLossyImp(input, compressedSize, nelements, output);
return tsDecompressDoubleImp(buffer, nelements, output);
} else { } else {
assert(0); #endif
return -1; // decompress lossless
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressDoubleImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
} }
#endif
}
#ifdef TD_TSZ
//
// lossy float double
//
static FORCE_INLINE int tsCompressFloatLossy(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
return tsCompressFloatLossyImp(input, nelements, output);
}
static FORCE_INLINE int tsDecompressFloatLossy(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize){
return tsDecompressFloatLossyImp(input, compressedSize, nelements, output);
}
static FORCE_INLINE int tsCompressDoubleLossy(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize){
return tsCompressDoubleLossyImp(input, nelements, output);
}
static FORCE_INLINE int tsDecompressDoubleLossy(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize){
return tsDecompressDoubleLossyImp(input, compressedSize, nelements, output);
} }
#endif
static FORCE_INLINE int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) { char algorithm, char *const buffer, int bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
......
...@@ -49,9 +49,14 @@ ...@@ -49,9 +49,14 @@
#include "os.h" #include "os.h"
#include "lz4.h" #include "lz4.h"
#ifdef TD_TSZ
#include "td_sz.h"
#endif
#include "taosdef.h" #include "taosdef.h"
#include "tscompression.h" #include "tscompression.h"
#include "tulog.h" #include "tulog.h"
#include "tglobal.h"
static const int TEST_NUMBER = 1; static const int TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
...@@ -61,6 +66,39 @@ static const int TEST_NUMBER = 1; ...@@ -61,6 +66,39 @@ static const int TEST_NUMBER = 1;
#define ZIGZAG_ENCODE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode #define ZIGZAG_ENCODE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode
#define ZIGZAG_DECODE(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode #define ZIGZAG_DECODE(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode
#ifdef TD_TSZ
bool lossyFloat = false;
bool lossyDouble = false;
// init call
int tsCompressInit(){
// config
if(lossyColumns[0] == 0){
lossyFloat = false;
lossyDouble = false;
return 0;
}
lossyFloat = strstr(lossyColumns, "float") != NULL;
lossyDouble = strstr(lossyColumns, "double") != NULL;
if(lossyFloat == false && lossyDouble == false)
return 0;
tdszInit(fPrecision, dPrecision, maxIntervals, intervals, Compressor);
if(lossyFloat)
uInfo("lossy compression float is opened. ");
if(lossyDouble)
uInfo("lossy compression double is opened. ");
return 1;
}
// exit call
void tsCompressExit(){
tdszExit();
}
#endif
/* /*
* Compress Integer (Simple8B). * Compress Integer (Simple8B).
*/ */
...@@ -410,6 +448,7 @@ int tsCompressStringImp(const char *const input, int inputSize, char *const outp ...@@ -410,6 +448,7 @@ int tsCompressStringImp(const char *const input, int inputSize, char *const outp
int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize) { int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize) {
// compressedSize is the size of data after compression. // compressedSize is the size of data after compression.
if (input[0] == 1) { if (input[0] == 1) {
/* It is compressed by LZ4 algorithm */ /* It is compressed by LZ4 algorithm */
const int decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize); const int decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
...@@ -886,3 +925,72 @@ int tsDecompressFloatImp(const char *const input, const int nelements, char *con ...@@ -886,3 +925,72 @@ int tsDecompressFloatImp(const char *const input, const int nelements, char *con
return nelements * FLOAT_BYTES; return nelements * FLOAT_BYTES;
} }
#ifdef TD_TSZ
//
// ---------- float double lossy -----------
//
int tsCompressFloatLossyImp(const char * input, const int nelements, char *const output){
// compress with sz
int compressedSize = tdszCompress(SZ_FLOAT, input, nelements, output + 1);
unsigned char algo = ALGO_SZ_LOSSY << 1;
if (compressedSize == 0 || compressedSize >= nelements*sizeof(float)){
// compressed error or large than original
output[0] = MODE_NOCOMPRESS | algo;
memcpy(output + 1, input, nelements * sizeof(float));
compressedSize = 1 + nelements * sizeof(float);
} else {
// compressed successfully
output[0] = MODE_COMPRESS | algo;
compressedSize += 1;
}
return compressedSize;
}
int tsDecompressFloatLossyImp(const char * input, int compressedSize, const int nelements, char *const output){
int decompressedSize = 0;
if( HEAD_MODE(input[0]) == MODE_NOCOMPRESS){
// orginal so memcpy directly
decompressedSize = nelements * sizeof(float);
memcpy(output, input + 1, decompressedSize);
return decompressedSize;
}
// decompressed with sz
return tdszDecompress(SZ_FLOAT, input + 1, compressedSize - 1, nelements, output);
}
int tsCompressDoubleLossyImp(const char * input, const int nelements, char *const output){
// compress with sz
int compressedSize = tdszCompress(SZ_DOUBLE, input, nelements, output + 1);
unsigned char algo = ALGO_SZ_LOSSY << 1;
if (compressedSize == 0 || compressedSize >= nelements*sizeof(double)) {
// compressed error or large than original
output[0] = MODE_NOCOMPRESS | algo;
memcpy(output + 1, input, nelements * sizeof(double));
compressedSize = 1 + nelements * sizeof(double);
} else {
// compressed successfully
output[0] = MODE_COMPRESS | algo;
compressedSize += 1;
}
return compressedSize;
}
int tsDecompressDoubleLossyImp(const char * input, int compressedSize, const int nelements, char *const output){
int decompressedSize = 0;
if( HEAD_MODE(input[0]) == MODE_NOCOMPRESS){
// orginal so memcpy directly
decompressedSize = nelements * sizeof(double);
memcpy(output, input + 1, decompressedSize);
return decompressedSize;
}
// decompressed with sz
return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
}
#endif
\ No newline at end of file
...@@ -61,6 +61,24 @@ static void taosReadFloatConfig(SGlobalCfg *cfg, char *input_value) { ...@@ -61,6 +61,24 @@ static void taosReadFloatConfig(SGlobalCfg *cfg, char *input_value) {
} }
} }
static void taosReadDoubleConfig(SGlobalCfg *cfg, char *input_value) {
double value = atof(input_value);
double *option = (double *)cfg->ptr;
if (value < cfg->minValue || value > cfg->maxValue) {
uError("config option:%s, input value:%s, out of range[%f, %f], use default value:%f",
cfg->option, input_value, cfg->minValue, cfg->maxValue, *option);
} else {
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_FILE) {
*option = value;
cfg->cfgStatus = TAOS_CFG_CSTATUS_FILE;
} else {
uWarn("config option:%s, input value:%s, is configured by %s, use %f", cfg->option, input_value,
tsCfgStatusStr[cfg->cfgStatus], *option);
}
}
}
static void taosReadInt32Config(SGlobalCfg *cfg, char *input_value) { static void taosReadInt32Config(SGlobalCfg *cfg, char *input_value) {
int32_t value = atoi(input_value); int32_t value = atoi(input_value);
int32_t *option = (int32_t *)cfg->ptr; int32_t *option = (int32_t *)cfg->ptr;
...@@ -262,6 +280,9 @@ static void taosReadConfigOption(const char *option, char *value, char *value2, ...@@ -262,6 +280,9 @@ static void taosReadConfigOption(const char *option, char *value, char *value2,
case TAOS_CFG_VTYPE_FLOAT: case TAOS_CFG_VTYPE_FLOAT:
taosReadFloatConfig(cfg, value); taosReadFloatConfig(cfg, value);
break; break;
case TAOS_CFG_VTYPE_DOUBLE:
taosReadDoubleConfig(cfg, value);
break;
case TAOS_CFG_VTYPE_STRING: case TAOS_CFG_VTYPE_STRING:
taosReadStringConfig(cfg, value); taosReadStringConfig(cfg, value);
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册