tcompression.c 30.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

/* README.md   TAOS compression
 *
 * INTEGER Compression Algorithm:
S
Shengliang Guan 已提交
19
 *   To compress integers (including char, short, int32_t, int64_t), the difference
H
hzcheng 已提交
20 21
 *   between two integers is calculated at first. Then the difference is
 *   transformed to positive by zig-zag encoding method
22
 *   (https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba). Then the value is
H
hzcheng 已提交
23 24 25
 *   encoded using simple 8B method. For more information about simple 8B,
 *   refer to https://en.wikipedia.org/wiki/8b/10b_encoding.
 *
26
 *   NOTE : For bigint, only 59 bits can be used, which means data from -(2**59) to (2**59)-1
H
hzcheng 已提交
27 28 29
 *   are allowed.
 *
 * BOOLEAN Compression Algorithm:
30
 *   We provide two methods for compress boolean types. Because boolean types in C
31
 *   code are char bytes with 0 and 1 values only, only one bit can used to discriminate
H
hzcheng 已提交
32
 *   the values.
33
 *   1. The first method is using only 1 bit to represent the boolean value with 1 for
H
hzcheng 已提交
34
 *   true and 0 for false. Then the compression rate is 1/8.
35
 *   2. The second method is using run length encoding (RLE) methods. This method works
H
hzcheng 已提交
36 37 38 39 40 41
 *   better when there are a lot of consecutive true values or false values.
 *
 * STRING Compression Algorithm:
 *   We us LZ4 method to compress the string type.
 *
 * FLOAT Compression Algorithm:
42 43 44 45 46
 *   We use the same method with Akumuli to compress float and double types. The compression
 *   algorithm assumes the float/double values change slightly. So we take the XOR between two
 *   adjacent values. Then compare the number of leading zeros and trailing zeros. If the number
 *   of leading zeros are larger than the trailing zeros, then record the last serveral bytes
 *   of the XORed value with informations. If not, record the first corresponding bytes.
H
hzcheng 已提交
47 48 49
 *
 */

S
Shengliang Guan 已提交
50
#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
51
#include "tcompression.h"
S
Shengliang Guan 已提交
52
#include "lz4.h"
S
log  
Shengliang Guan 已提交
53
#include "tlog.h"
H
hzcheng 已提交
54

S
Shengliang Guan 已提交
55 56 57 58 59 60
#ifdef TD_TSZ
#include "td_sz.h"
#endif

static const int32_t TEST_NUMBER = 1;
#define is_bigendian()     ((*(char *)&TEST_NUMBER) == 0)
H
Hongze Cheng 已提交
61
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
H
hzcheng 已提交
62

S
Shengliang Guan 已提交
63
#define safeInt64Add(a, b)  (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
H
Hongze Cheng 已提交
64 65
#define ZIGZAG_ENCODE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)  // zigzag encode
#define ZIGZAG_DECODE(T, v) ((v) >> 1) ^ -((T)((v)&1))                                 // zigzag decode
H
hzcheng 已提交
66

T
tickduan 已提交
67
#ifdef TD_TSZ
S
Shengliang Guan 已提交
68
bool lossyFloat = false;
T
tickduan 已提交
69 70 71
bool lossyDouble = false;

// init call
S
Shengliang Guan 已提交
72 73 74
int32_t tsCompressInit() {
  // config
  if (lossyColumns[0] == 0) {
T
tickduan 已提交
75 76 77 78 79
    lossyFloat = false;
    lossyDouble = false;
    return 0;
  }

S
Shengliang Guan 已提交
80
  lossyFloat = strstr(lossyColumns, "float") != NULL;
T
tickduan 已提交
81 82
  lossyDouble = strstr(lossyColumns, "double") != NULL;

S
Shengliang Guan 已提交
83 84
  if (lossyFloat == false && lossyDouble == false) return 0;

T
tickduan 已提交
85
  tdszInit(fPrecision, dPrecision, maxRange, curRange, Compressor);
86 87
  if (lossyFloat) uTrace("lossy compression float  is opened. ");
  if (lossyDouble) uTrace("lossy compression double is opened. ");
T
tickduan 已提交
88 89 90
  return 1;
}
// exit call
S
Shengliang Guan 已提交
91
void tsCompressExit() { tdszExit(); }
T
tickduan 已提交
92

93
#endif
T
tickduan 已提交
94

H
hzcheng 已提交
95 96 97
/*
 * Compress Integer (Simple8B).
 */
S
Shengliang Guan 已提交
98
int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
H
hzcheng 已提交
99 100
  // Selector value:              0    1   2   3   4   5   6   7   8  9  10  11
  // 12  13  14  15
S
Shengliang Guan 已提交
101 102 103
  char    bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
  int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
  char    bit_to_selector[] = {0,  2,  3,  4,  5,  6,  7,  8,  9,  10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13,
H
Hongze Cheng 已提交
104 105
                               14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
                               15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
H
hzcheng 已提交
106 107

  // get the byte limit.
S
Shengliang Guan 已提交
108
  int32_t word_length = 0;
H
hzcheng 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122
  switch (type) {
    case TSDB_DATA_TYPE_BIGINT:
      word_length = LONG_BYTES;
      break;
    case TSDB_DATA_TYPE_INT:
      word_length = INT_BYTES;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      word_length = SHORT_BYTES;
      break;
    case TSDB_DATA_TYPE_TINYINT:
      word_length = CHAR_BYTES;
      break;
    default:
123
      uError("Invalid compress integer type:%d", type);
H
Hongze Cheng 已提交
124
      return -1;
H
hzcheng 已提交
125 126
  }

S
Shengliang Guan 已提交
127 128
  int32_t byte_limit = nelements * word_length + 1;
  int32_t opos = 1;
H
hzcheng 已提交
129 130
  int64_t prev_value = 0;

S
Shengliang Guan 已提交
131
  for (int32_t i = 0; i < nelements;) {
H
hzcheng 已提交
132 133
    char    selector = 0;
    char    bit = 0;
S
Shengliang Guan 已提交
134
    int32_t elems = 0;
H
hzcheng 已提交
135 136
    int64_t prev_value_tmp = prev_value;

S
Shengliang Guan 已提交
137
    for (int32_t j = i; j < nelements; j++) {
H
hzcheng 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
      // Read data from the input stream and convert it to INT64 type.
      int64_t curr_value = 0;
      switch (type) {
        case TSDB_DATA_TYPE_TINYINT:
          curr_value = (int64_t)(*((int8_t *)input + j));
          break;
        case TSDB_DATA_TYPE_SMALLINT:
          curr_value = (int64_t)(*((int16_t *)input + j));
          break;
        case TSDB_DATA_TYPE_INT:
          curr_value = (int64_t)(*((int32_t *)input + j));
          break;
        case TSDB_DATA_TYPE_BIGINT:
          curr_value = (int64_t)(*((int64_t *)input + j));
          break;
      }
      // Get difference.
Y
yihaoDeng 已提交
155
      if (!safeInt64Add(curr_value, -prev_value_tmp)) goto _copy_and_exit;
H
hzcheng 已提交
156 157 158

      int64_t diff = curr_value - prev_value_tmp;
      // Zigzag encode the value.
H
Hongze Cheng 已提交
159
      uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, diff);
H
hzcheng 已提交
160 161 162

      if (zigzag_value >= SIMPLE8B_MAX_INT64) goto _copy_and_exit;

S
TD-1037  
Shengliang Guan 已提交
163
      int64_t tmp_bit;
H
hzcheng 已提交
164 165 166 167
      if (zigzag_value == 0) {
        // Take care here, __builtin_clzl give wrong anser for value 0;
        tmp_bit = 0;
      } else {
168
        tmp_bit = (LONG_BYTES * BITS_PER_BYTE) - BUILDIN_CLZL(zigzag_value);
H
hzcheng 已提交
169 170
      }

S
Shengliang Guan 已提交
171 172
      if (elems + 1 <= selector_to_elems[(int32_t)selector] &&
          elems + 1 <= selector_to_elems[(int32_t)(bit_to_selector[(int32_t)tmp_bit])]) {
H
hzcheng 已提交
173
        // If can hold another one.
S
Shengliang Guan 已提交
174
        selector = selector > bit_to_selector[(int32_t)tmp_bit] ? selector : bit_to_selector[(int32_t)tmp_bit];
H
hzcheng 已提交
175
        elems++;
S
Shengliang Guan 已提交
176
        bit = bit_per_integer[(int32_t)selector];
H
hzcheng 已提交
177 178
      } else {
        // if cannot hold another one.
S
Shengliang Guan 已提交
179 180 181
        while (elems < selector_to_elems[(int32_t)selector]) selector++;
        elems = selector_to_elems[(int32_t)selector];
        bit = bit_per_integer[(int32_t)selector];
H
hzcheng 已提交
182 183 184 185 186 187 188
        break;
      }
      prev_value_tmp = curr_value;
    }

    uint64_t buffer = 0;
    buffer |= (uint64_t)selector;
S
Shengliang Guan 已提交
189
    for (int32_t k = 0; k < elems; k++) {
H
hzcheng 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
      int64_t curr_value = 0; /* get current values */
      switch (type) {
        case TSDB_DATA_TYPE_TINYINT:
          curr_value = (int64_t)(*((int8_t *)input + i));
          break;
        case TSDB_DATA_TYPE_SMALLINT:
          curr_value = (int64_t)(*((int16_t *)input + i));
          break;
        case TSDB_DATA_TYPE_INT:
          curr_value = (int64_t)(*((int32_t *)input + i));
          break;
        case TSDB_DATA_TYPE_BIGINT:
          curr_value = (int64_t)(*((int64_t *)input + i));
          break;
      }
      int64_t  diff = curr_value - prev_value;
H
Hongze Cheng 已提交
206
      uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, diff);
H
hzcheng 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
      buffer |= ((zigzag_value & INT64MASK(bit)) << (bit * k + 4));
      i++;
      prev_value = curr_value;
    }

    // Output the encoded value to the output.
    if (opos + sizeof(buffer) <= byte_limit) {
      memcpy(output + opos, &buffer, sizeof(buffer));
      opos += sizeof(buffer);
    } else {
    _copy_and_exit:
      output[0] = 1;
      memcpy(output + 1, input, byte_limit - 1);
      return byte_limit;
    }
  }

  // set the indicator.
  output[0] = 0;
  return opos;
}

S
Shengliang Guan 已提交
229 230
int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
  int32_t word_length = 0;
H
hzcheng 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244
  switch (type) {
    case TSDB_DATA_TYPE_BIGINT:
      word_length = LONG_BYTES;
      break;
    case TSDB_DATA_TYPE_INT:
      word_length = INT_BYTES;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      word_length = SHORT_BYTES;
      break;
    case TSDB_DATA_TYPE_TINYINT:
      word_length = CHAR_BYTES;
      break;
    default:
245
      uError("Invalid decompress integer type:%d", type);
H
Hongze Cheng 已提交
246
      return -1;
H
hzcheng 已提交
247 248 249 250 251 252 253 254 255 256
  }

  // If not compressed.
  if (input[0] == 1) {
    memcpy(output, input + 1, nelements * word_length);
    return nelements * word_length;
  }

  // Selector value:              0    1   2   3   4   5   6   7   8  9  10  11
  // 12  13  14  15
S
Shengliang Guan 已提交
257 258
  char    bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
  int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
H
hzcheng 已提交
259 260

  const char *ip = input + 1;
S
Shengliang Guan 已提交
261 262
  int32_t     count = 0;
  int32_t     _pos = 0;
H
hzcheng 已提交
263 264 265 266 267 268 269 270
  int64_t     prev_value = 0;

  while (1) {
    if (count == nelements) break;

    uint64_t w = 0;
    memcpy(&w, ip, LONG_BYTES);

S
Shengliang Guan 已提交
271 272 273
    char    selector = (char)(w & INT64MASK(4));       // selector = 4
    char    bit = bit_per_integer[(int32_t)selector];  // bit = 3
    int32_t elems = selector_to_elems[(int32_t)selector];
H
hzcheng 已提交
274

S
Shengliang Guan 已提交
275
    for (int32_t i = 0; i < elems; i++) {
H
hzcheng 已提交
276 277 278 279 280 281 282
      uint64_t zigzag_value;

      if (selector == 0 || selector == 1) {
        zigzag_value = 0;
      } else {
        zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
      }
H
Hongze Cheng 已提交
283
      int64_t diff = ZIGZAG_DECODE(int64_t, zigzag_value);
H
hzcheng 已提交
284 285 286 287 288
      int64_t curr_value = diff + prev_value;
      prev_value = curr_value;

      switch (type) {
        case TSDB_DATA_TYPE_BIGINT:
H
Hongze Cheng 已提交
289
          *((int64_t *)output + _pos) = (int64_t)curr_value;
H
hzcheng 已提交
290 291 292
          _pos++;
          break;
        case TSDB_DATA_TYPE_INT:
H
Hongze Cheng 已提交
293
          *((int32_t *)output + _pos) = (int32_t)curr_value;
H
hzcheng 已提交
294 295 296
          _pos++;
          break;
        case TSDB_DATA_TYPE_SMALLINT:
H
Hongze Cheng 已提交
297
          *((int16_t *)output + _pos) = (int16_t)curr_value;
H
hzcheng 已提交
298 299 300
          _pos++;
          break;
        case TSDB_DATA_TYPE_TINYINT:
H
Hongze Cheng 已提交
301
          *((int8_t *)output + _pos) = (int8_t)curr_value;
H
hzcheng 已提交
302 303 304 305
          _pos++;
          break;
        default:
          perror("Wrong integer types.\n");
H
Hongze Cheng 已提交
306
          return -1;
H
hzcheng 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319
      }
      count++;
      if (count == nelements) break;
    }
    ip += LONG_BYTES;
  }

  return nelements * word_length;
}

/* ----------------------------------------------Bool Compression
 * ---------------------------------------------- */
// TODO: You can also implement it using RLE method.
S
Shengliang Guan 已提交
320 321 322
int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
  int32_t pos = -1;
  int32_t ele_per_byte = BITS_PER_BYTE / 2;
H
hzcheng 已提交
323

S
Shengliang Guan 已提交
324
  for (int32_t i = 0; i < nelements; i++) {
H
hzcheng 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
    if (i % ele_per_byte == 0) {
      pos++;
      output[pos] = 0;
    }

    uint8_t t = 0;
    if (input[i] == 1) {
      t = (((uint8_t)1) << (2 * (i % ele_per_byte)));
      output[pos] |= t;
    } else if (input[i] == 0) {
      t = ((uint8_t)1 << (2 * (i % ele_per_byte))) - 1;
      /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
      output[pos] &= t;
    } else if (input[i] == TSDB_DATA_BOOL_NULL) {
      t = ((uint8_t)2 << (2 * (i % ele_per_byte)));
      /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
      output[pos] |= t;
    } else {
343
      uError("Invalid compress bool value:%d", output[pos]);
H
Hongze Cheng 已提交
344
      return -1;
H
hzcheng 已提交
345 346 347 348 349 350
    }
  }

  return pos + 1;
}

S
Shengliang Guan 已提交
351 352 353
int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
  int32_t ipos = -1, opos = 0;
  int32_t ele_per_byte = BITS_PER_BYTE / 2;
H
hzcheng 已提交
354

S
Shengliang Guan 已提交
355
  for (int32_t i = 0; i < nelements; i++) {
H
hzcheng 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
    if (i % ele_per_byte == 0) {
      ipos++;
    }

    uint8_t ele = (input[ipos] >> (2 * (i % ele_per_byte))) & INT8MASK(2);
    if (ele == 1) {
      output[opos++] = 1;
    } else if (ele == 2) {
      output[opos++] = TSDB_DATA_BOOL_NULL;
    } else {
      output[opos++] = 0;
    }
  }

  return nelements;
}

/* Run Length Encoding(RLE) Method */
S
Shengliang Guan 已提交
374 375
int32_t tsCompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
  int32_t _pos = 0;
H
hzcheng 已提交
376

S
Shengliang Guan 已提交
377
  for (int32_t i = 0; i < nelements;) {
H
hzcheng 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
    unsigned char counter = 1;
    char          num = input[i];

    for (++i; i < nelements; i++) {
      if (input[i] == num) {
        counter++;
        if (counter == INT8MASK(7)) {
          i++;
          break;
        }
      } else {
        break;
      }
    }

    // Encode the data.
    if (num == 1) {
      output[_pos++] = INT8MASK(1) | (counter << 1);
    } else if (num == 0) {
      output[_pos++] = (counter << 1) | INT8MASK(0);
    } else {
399
      uError("Invalid compress bool value:%d", output[_pos]);
H
Hongze Cheng 已提交
400
      return -1;
H
hzcheng 已提交
401 402 403 404 405 406
    }
  }

  return _pos;
}

S
Shengliang Guan 已提交
407 408
int32_t tsDecompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
  int32_t ipos = 0, opos = 0;
H
hzcheng 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
  while (1) {
    char     encode = input[ipos++];
    unsigned counter = (encode >> 1) & INT8MASK(7);
    char     value = encode & INT8MASK(1);

    memset(output + opos, value, counter);
    opos += counter;
    if (opos >= nelements) {
      return nelements;
    }
  }
}

/* ----------------------------------------------String Compression
 * ---------------------------------------------- */
// Note: the size of the output must be larger than input_size + 1 and
// LZ4_compressBound(size) + 1;
// >= max(input_size, LZ4_compressBound(input_size)) + 1;
S
Shengliang Guan 已提交
427
int32_t tsCompressStringImp(const char *const input, int32_t inputSize, char *const output, int32_t outputSize) {
H
hzcheng 已提交
428
  // Try to compress using LZ4 algorithm.
S
Shengliang Guan 已提交
429
  const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
H
hzcheng 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442

  // If cannot compress or after compression, data becomes larger.
  if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
    /* First byte is for indicator */
    output[0] = 0;
    memcpy(output + 1, input, inputSize);
    return inputSize + 1;
  }

  output[0] = 1;
  return compressed_data_size + 1;
}

S
Shengliang Guan 已提交
443
int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, char *const output, int32_t outputSize) {
H
hzcheng 已提交
444
  // compressedSize is the size of data after compression.
S
Shengliang Guan 已提交
445

H
hzcheng 已提交
446 447
  if (input[0] == 1) {
    /* It is compressed by LZ4 algorithm */
S
Shengliang Guan 已提交
448
    const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
H
hzcheng 已提交
449
    if (decompressed_size < 0) {
450
      uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
H
Hongze Cheng 已提交
451
      return -1;
H
hzcheng 已提交
452 453 454 455 456 457 458 459
    }

    return decompressed_size;
  } else if (input[0] == 0) {
    /* It is not compressed by LZ4 algorithm */
    memcpy(output, input + 1, compressedSize - 1);
    return compressedSize - 1;
  } else {
460
    uError("Invalid decompress string indicator:%d", input[0]);
H
Hongze Cheng 已提交
461
    return -1;
H
hzcheng 已提交
462 463 464 465 466 467
  }
}

/* --------------------------------------------Timestamp Compression
 * ---------------------------------------------- */
// TODO: Take care here, we assumes little endian encoding.
S
Shengliang Guan 已提交
468 469
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
  int32_t _pos = 1;
H
hzcheng 已提交
470 471 472 473 474 475
  assert(nelements >= 0);

  if (nelements == 0) return 0;

  int64_t *istream = (int64_t *)input;

S
Shengliang Guan 已提交
476 477 478 479
  int64_t prev_value = istream[0];
  if (prev_value >= 0x8000000000000000) {
    uWarn("compression timestamp is over signed long long range. ts = 0x%" PRIx64 " \n", prev_value);
    goto _exit_over;
T
tickduan 已提交
480
  }
H
hzcheng 已提交
481 482 483 484
  int64_t  prev_delta = -prev_value;
  uint8_t  flags = 0, flag1 = 0, flag2 = 0;
  uint64_t dd1 = 0, dd2 = 0;

S
Shengliang Guan 已提交
485
  for (int32_t i = 0; i < nelements; i++) {
H
hzcheng 已提交
486 487 488 489 490 491
    int64_t curr_value = istream[i];
    if (!safeInt64Add(curr_value, -prev_value)) goto _exit_over;
    int64_t curr_delta = curr_value - prev_value;
    if (!safeInt64Add(curr_delta, -prev_delta)) goto _exit_over;
    int64_t delta_of_delta = curr_delta - prev_delta;
    // zigzag encode the value.
H
Hongze Cheng 已提交
492
    uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, delta_of_delta);
H
hzcheng 已提交
493 494 495 496 497 498
    if (i % 2 == 0) {
      flags = 0;
      dd1 = zigzag_value;
      if (dd1 == 0) {
        flag1 = 0;
      } else {
H
Hongze Cheng 已提交
499
        flag1 = (uint8_t)(LONG_BYTES - BUILDIN_CLZL(dd1) / BITS_PER_BYTE);
H
hzcheng 已提交
500 501 502 503 504 505
      }
    } else {
      dd2 = zigzag_value;
      if (dd2 == 0) {
        flag2 = 0;
      } else {
H
Hongze Cheng 已提交
506
        flag2 = (uint8_t)(LONG_BYTES - BUILDIN_CLZL(dd2) / BITS_PER_BYTE);
H
hzcheng 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
      }
      flags = flag1 | (flag2 << 4);
      // Encode the flag.
      if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over;
      memcpy(output + _pos, &flags, CHAR_BYTES);
      _pos += CHAR_BYTES;
      /* Here, we assume it is little endian encoding method. */
      // Encode dd1
      if (is_bigendian()) {
        if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
        memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1);
      } else {
        if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
        memcpy(output + _pos, (char *)(&dd1), flag1);
      }
      _pos += flag1;
      // Encode dd2;
      if (is_bigendian()) {
        if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over;
        memcpy(output + _pos, (char *)(&dd2) + LONG_BYTES - flag2, flag2);
      } else {
        if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over;
        memcpy(output + _pos, (char *)(&dd2), flag2);
      }
      _pos += flag2;
    }
    prev_value = curr_value;
    prev_delta = curr_delta;
  }

  if (nelements % 2 == 1) {
    flag2 = 0;
    flags = flag1 | (flag2 << 4);
    // Encode the flag.
    if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over;
    memcpy(output + _pos, &flags, CHAR_BYTES);
    _pos += CHAR_BYTES;
    // Encode dd1;
    if (is_bigendian()) {
      if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
      memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1);
    } else {
      if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
      memcpy(output + _pos, (char *)(&dd1), flag1);
    }
    _pos += flag1;
  }

  output[0] = 1;  // Means the string is compressed
  return _pos;

_exit_over:
  output[0] = 0;  // Means the string is not compressed
  memcpy(output + 1, input, nelements * LONG_BYTES);
  return nelements * LONG_BYTES + 1;
}

S
Shengliang Guan 已提交
564
int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
H
hzcheng 已提交
565 566 567 568 569 570 571 572 573
  assert(nelements >= 0);
  if (nelements == 0) return 0;

  if (input[0] == 0) {
    memcpy(output, input + 1, nelements * LONG_BYTES);
    return nelements * LONG_BYTES;
  } else if (input[0] == 1) {  // Decompress
    int64_t *ostream = (int64_t *)output;

S
Shengliang Guan 已提交
574
    int32_t ipos = 1, opos = 0;
H
hzcheng 已提交
575 576 577 578 579 580 581 582 583 584 585 586 587 588
    int8_t  nbytes = 0;
    int64_t prev_value = 0;
    int64_t prev_delta = 0;
    int64_t delta_of_delta = 0;

    while (1) {
      uint8_t flags = input[ipos++];
      // Decode dd1
      uint64_t dd1 = 0;
      nbytes = flags & INT8MASK(4);
      if (nbytes == 0) {
        delta_of_delta = 0;
      } else {
        if (is_bigendian()) {
H
Hui Li 已提交
589
          memcpy(((char *)(&dd1)) + LONG_BYTES - nbytes, input + ipos, nbytes);
H
hzcheng 已提交
590 591 592
        } else {
          memcpy(&dd1, input + ipos, nbytes);
        }
H
Hongze Cheng 已提交
593
        delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
H
hzcheng 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
      }
      ipos += nbytes;
      if (opos == 0) {
        prev_value = delta_of_delta;
        prev_delta = 0;
        ostream[opos++] = delta_of_delta;
      } else {
        prev_delta = delta_of_delta + prev_delta;
        prev_value = prev_value + prev_delta;
        ostream[opos++] = prev_value;
      }
      if (opos == nelements) return nelements * LONG_BYTES;

      // Decode dd2
      uint64_t dd2 = 0;
      nbytes = (flags >> 4) & INT8MASK(4);
      if (nbytes == 0) {
        delta_of_delta = 0;
      } else {
        if (is_bigendian()) {
H
Hui Li 已提交
614
          memcpy(((char *)(&dd2)) + LONG_BYTES - nbytes, input + ipos, nbytes);
H
hzcheng 已提交
615 616 617 618
        } else {
          memcpy(&dd2, input + ipos, nbytes);
        }
        // zigzag_decoding
H
Hongze Cheng 已提交
619
        delta_of_delta = ZIGZAG_DECODE(int64_t, dd2);
H
hzcheng 已提交
620 621 622 623 624 625 626 627 628 629
      }
      ipos += nbytes;
      prev_delta = delta_of_delta + prev_delta;
      prev_value = prev_value + prev_delta;
      ostream[opos++] = prev_value;
      if (opos == nelements) return nelements * LONG_BYTES;
    }

  } else {
    assert(0);
S
TD-1057  
Shengliang Guan 已提交
630
    return -1;
H
hzcheng 已提交
631 632 633 634
  }
}
/* --------------------------------------------Double Compression
 * ---------------------------------------------- */
S
Shengliang Guan 已提交
635
void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) {
H
hzcheng 已提交
636
  uint8_t nbytes = (flag & INT8MASK(3)) + 1;
S
Shengliang Guan 已提交
637
  int32_t nshift = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
H
hzcheng 已提交
638 639 640 641 642 643 644 645 646
  diff >>= nshift;

  while (nbytes) {
    output[(*pos)++] = (int8_t)(diff & INT64MASK(8));
    diff >>= BITS_PER_BYTE;
    nbytes--;
  }
}

S
Shengliang Guan 已提交
647 648 649
int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output) {
  int32_t byte_limit = nelements * DOUBLE_BYTES + 1;
  int32_t opos = 1;
H
hzcheng 已提交
650 651 652 653 654 655 656 657

  uint64_t prev_value = 0;
  uint64_t prev_diff = 0;
  uint8_t  prev_flag = 0;

  double *istream = (double *)input;

  // Main loop
S
Shengliang Guan 已提交
658
  for (int32_t i = 0; i < nelements; i++) {
H
hzcheng 已提交
659 660 661 662 663 664 665 666 667 668 669
    union {
      double   real;
      uint64_t bits;
    } curr;

    curr.real = istream[i];

    // Here we assume the next value is the same as previous one.
    uint64_t predicted = prev_value;
    uint64_t diff = curr.bits ^ predicted;

S
Shengliang Guan 已提交
670 671
    int32_t leading_zeros = LONG_BYTES * BITS_PER_BYTE;
    int32_t trailing_zeros = leading_zeros;
H
hzcheng 已提交
672 673

    if (diff) {
674 675
      trailing_zeros = BUILDIN_CTZL(diff);
      leading_zeros = BUILDIN_CLZL(diff);
H
hzcheng 已提交
676 677 678 679 680 681
    }

    uint8_t nbytes = 0;
    uint8_t flag;

    if (trailing_zeros > leading_zeros) {
H
Hongze Cheng 已提交
682
      nbytes = (uint8_t)(LONG_BYTES - trailing_zeros / BITS_PER_BYTE);
H
hzcheng 已提交
683 684 685 686

      if (nbytes > 0) nbytes--;
      flag = ((uint8_t)1 << 3) | nbytes;
    } else {
H
Hongze Cheng 已提交
687
      nbytes = (uint8_t)(LONG_BYTES - leading_zeros / BITS_PER_BYTE);
H
hzcheng 已提交
688 689 690 691 692 693 694 695
      if (nbytes > 0) nbytes--;
      flag = nbytes;
    }

    if (i % 2 == 0) {
      prev_diff = diff;
      prev_flag = flag;
    } else {
S
Shengliang Guan 已提交
696 697
      int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
      int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
H
hzcheng 已提交
698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
      if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
        uint8_t flags = prev_flag | (flag << 4);
        output[opos++] = flags;
        encodeDoubleValue(prev_diff, prev_flag, output, &opos);
        encodeDoubleValue(diff, flag, output, &opos);
      } else {
        output[0] = 1;
        memcpy(output + 1, input, byte_limit - 1);
        return byte_limit;
      }
    }
    prev_value = curr.bits;
  }

  if (nelements % 2) {
S
Shengliang Guan 已提交
713 714
    int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
    int32_t nbyte2 = 1;
H
hzcheng 已提交
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
    if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
      uint8_t flags = prev_flag;
      output[opos++] = flags;
      encodeDoubleValue(prev_diff, prev_flag, output, &opos);
      encodeDoubleValue(0ul, 0, output, &opos);
    } else {
      output[0] = 1;
      memcpy(output + 1, input, byte_limit - 1);
      return byte_limit;
    }
  }

  output[0] = 0;
  return opos;
}

S
Shengliang Guan 已提交
731
uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) {
H
hzcheng 已提交
732
  uint64_t diff = 0ul;
S
Shengliang Guan 已提交
733 734
  int32_t  nbytes = (flag & INT8MASK(3)) + 1;
  for (int32_t i = 0; i < nbytes; i++) {
H
hzcheng 已提交
735 736
    diff = diff | ((INT64MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i);
  }
S
Shengliang Guan 已提交
737
  int32_t shift_width = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
H
hzcheng 已提交
738 739 740 741 742
  diff <<= shift_width;

  return diff;
}

S
Shengliang Guan 已提交
743
int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output) {
H
hzcheng 已提交
744 745 746 747 748 749 750 751 752
  // output stream
  double *ostream = (double *)output;

  if (input[0] == 1) {
    memcpy(output, input + 1, nelements * DOUBLE_BYTES);
    return nelements * DOUBLE_BYTES;
  }

  uint8_t  flags = 0;
S
Shengliang Guan 已提交
753 754
  int32_t  ipos = 1;
  int32_t  opos = 0;
H
hzcheng 已提交
755 756
  uint64_t prev_value = 0;

S
Shengliang Guan 已提交
757
  for (int32_t i = 0; i < nelements; i++) {
H
hzcheng 已提交
758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
    if (i % 2 == 0) {
      flags = input[ipos++];
    }

    uint8_t flag = flags & INT8MASK(4);
    flags >>= 4;

    uint64_t diff = decodeDoubleValue(input, &ipos, flag);
    union {
      uint64_t bits;
      double   real;
    } curr;

    uint64_t predicted = prev_value;
    curr.bits = predicted ^ diff;
    prev_value = curr.bits;

    ostream[opos++] = curr.real;
  }

  return nelements * DOUBLE_BYTES;
}

/* --------------------------------------------Float Compression
 * ---------------------------------------------- */
S
Shengliang Guan 已提交
783
void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) {
H
hzcheng 已提交
784
  uint8_t nbytes = (flag & INT8MASK(3)) + 1;
S
Shengliang Guan 已提交
785
  int32_t nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
H
hzcheng 已提交
786 787 788 789 790 791 792 793 794
  diff >>= nshift;

  while (nbytes) {
    output[(*pos)++] = (int8_t)(diff & INT32MASK(8));
    diff >>= BITS_PER_BYTE;
    nbytes--;
  }
}

S
Shengliang Guan 已提交
795 796 797 798
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
  float  *istream = (float *)input;
  int32_t byte_limit = nelements * FLOAT_BYTES + 1;
  int32_t opos = 1;
H
hzcheng 已提交
799 800 801 802 803 804

  uint32_t prev_value = 0;
  uint32_t prev_diff = 0;
  uint8_t  prev_flag = 0;

  // Main loop
S
Shengliang Guan 已提交
805
  for (int32_t i = 0; i < nelements; i++) {
H
hzcheng 已提交
806 807 808 809 810 811 812 813 814 815 816
    union {
      float    real;
      uint32_t bits;
    } curr;

    curr.real = istream[i];

    // Here we assume the next value is the same as previous one.
    uint32_t predicted = prev_value;
    uint32_t diff = curr.bits ^ predicted;

S
Shengliang Guan 已提交
817 818
    int32_t leading_zeros = FLOAT_BYTES * BITS_PER_BYTE;
    int32_t trailing_zeros = leading_zeros;
H
hzcheng 已提交
819 820

    if (diff) {
821 822
      trailing_zeros = BUILDIN_CTZ(diff);
      leading_zeros = BUILDIN_CLZ(diff);
H
hzcheng 已提交
823 824 825 826 827 828
    }

    uint8_t nbytes = 0;
    uint8_t flag;

    if (trailing_zeros > leading_zeros) {
H
Hongze Cheng 已提交
829
      nbytes = (uint8_t)(FLOAT_BYTES - trailing_zeros / BITS_PER_BYTE);
H
hzcheng 已提交
830 831 832 833

      if (nbytes > 0) nbytes--;
      flag = ((uint8_t)1 << 3) | nbytes;
    } else {
H
Hongze Cheng 已提交
834
      nbytes = (uint8_t)(FLOAT_BYTES - leading_zeros / BITS_PER_BYTE);
H
hzcheng 已提交
835 836 837 838 839 840 841 842
      if (nbytes > 0) nbytes--;
      flag = nbytes;
    }

    if (i % 2 == 0) {
      prev_diff = diff;
      prev_flag = flag;
    } else {
S
Shengliang Guan 已提交
843 844
      int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
      int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
H
hzcheng 已提交
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
      if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
        uint8_t flags = prev_flag | (flag << 4);
        output[opos++] = flags;
        encodeFloatValue(prev_diff, prev_flag, output, &opos);
        encodeFloatValue(diff, flag, output, &opos);
      } else {
        output[0] = 1;
        memcpy(output + 1, input, byte_limit - 1);
        return byte_limit;
      }
    }
    prev_value = curr.bits;
  }

  if (nelements % 2) {
S
Shengliang Guan 已提交
860 861
    int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
    int32_t nbyte2 = 1;
H
hzcheng 已提交
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877
    if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
      uint8_t flags = prev_flag;
      output[opos++] = flags;
      encodeFloatValue(prev_diff, prev_flag, output, &opos);
      encodeFloatValue(0, 0, output, &opos);
    } else {
      output[0] = 1;
      memcpy(output + 1, input, byte_limit - 1);
      return byte_limit;
    }
  }

  output[0] = 0;
  return opos;
}

S
Shengliang Guan 已提交
878
uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t flag) {
H
hzcheng 已提交
879
  uint32_t diff = 0ul;
S
Shengliang Guan 已提交
880 881
  int32_t  nbytes = (flag & INT8MASK(3)) + 1;
  for (int32_t i = 0; i < nbytes; i++) {
H
hzcheng 已提交
882 883
    diff = diff | ((INT32MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i);
  }
S
Shengliang Guan 已提交
884
  int32_t shift_width = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
H
hzcheng 已提交
885 886 887 888 889
  diff <<= shift_width;

  return diff;
}

S
Shengliang Guan 已提交
890
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
H
hzcheng 已提交
891 892 893 894 895 896 897 898
  float *ostream = (float *)output;

  if (input[0] == 1) {
    memcpy(output, input + 1, nelements * FLOAT_BYTES);
    return nelements * FLOAT_BYTES;
  }

  uint8_t  flags = 0;
S
Shengliang Guan 已提交
899 900
  int32_t  ipos = 1;
  int32_t  opos = 0;
H
hzcheng 已提交
901 902
  uint32_t prev_value = 0;

S
Shengliang Guan 已提交
903
  for (int32_t i = 0; i < nelements; i++) {
H
hzcheng 已提交
904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
    if (i % 2 == 0) {
      flags = input[ipos++];
    }

    uint8_t flag = flags & INT8MASK(4);
    flags >>= 4;

    uint32_t diff = decodeFloatValue(input, &ipos, flag);
    union {
      uint32_t bits;
      float    real;
    } curr;

    uint32_t predicted = prev_value;
    curr.bits = predicted ^ diff;
    prev_value = curr.bits;

    ostream[opos++] = curr.real;
  }

  return nelements * FLOAT_BYTES;
}
T
tickduan 已提交
926

S
Shengliang Guan 已提交
927
#ifdef TD_TSZ
T
tickduan 已提交
928 929 930
//
//   ----------  float double lossy  -----------
//
S
Shengliang Guan 已提交
931 932 933
int32_t tsCompressFloatLossyImp(const char *input, const int32_t nelements, char *const output) {
  // compress with sz
  int32_t       compressedSize = tdszCompress(SZ_FLOAT, input, nelements, output + 1);
T
tickduan 已提交
934
  unsigned char algo = ALGO_SZ_LOSSY << 1;
S
Shengliang Guan 已提交
935
  if (compressedSize == 0 || compressedSize >= nelements * sizeof(float)) {
T
tickduan 已提交
936 937 938 939 940 941 942 943 944 945 946
    // compressed error or large than original
    output[0] = MODE_NOCOMPRESS | algo;
    memcpy(output + 1, input, nelements * sizeof(float));
    compressedSize = 1 + nelements * sizeof(float);
  } else {
    // compressed successfully
    output[0] = MODE_COMPRESS | algo;
    compressedSize += 1;
  }

  return compressedSize;
T
tickduan 已提交
947 948
}

S
Shengliang Guan 已提交
949 950 951 952
int32_t tsDecompressFloatLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
                                  char *const output) {
  int32_t decompressedSize = 0;
  if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
T
tickduan 已提交
953 954 955 956 957
    // orginal so memcpy directly
    decompressedSize = nelements * sizeof(float);
    memcpy(output, input + 1, decompressedSize);

    return decompressedSize;
S
Shengliang Guan 已提交
958
  }
T
tickduan 已提交
959 960 961

  // decompressed with sz
  return tdszDecompress(SZ_FLOAT, input + 1, compressedSize - 1, nelements, output);
T
tickduan 已提交
962 963
}

S
Shengliang Guan 已提交
964 965 966
int32_t tsCompressDoubleLossyImp(const char *input, const int32_t nelements, char *const output) {
  // compress with sz
  int32_t       compressedSize = tdszCompress(SZ_DOUBLE, input, nelements, output + 1);
T
tickduan 已提交
967
  unsigned char algo = ALGO_SZ_LOSSY << 1;
S
Shengliang Guan 已提交
968
  if (compressedSize == 0 || compressedSize >= nelements * sizeof(double)) {
T
tickduan 已提交
969 970 971 972 973 974 975 976 977 978
    // compressed error or large than original
    output[0] = MODE_NOCOMPRESS | algo;
    memcpy(output + 1, input, nelements * sizeof(double));
    compressedSize = 1 + nelements * sizeof(double);
  } else {
    // compressed successfully
    output[0] = MODE_COMPRESS | algo;
    compressedSize += 1;
  }

S
Shengliang Guan 已提交
979
  return compressedSize;
T
tickduan 已提交
980 981
}

S
Shengliang Guan 已提交
982 983 984 985
int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
                                   char *const output) {
  int32_t decompressedSize = 0;
  if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
T
tickduan 已提交
986 987 988 989 990
    // orginal so memcpy directly
    decompressedSize = nelements * sizeof(double);
    memcpy(output, input + 1, decompressedSize);

    return decompressedSize;
S
Shengliang Guan 已提交
991
  }
T
tickduan 已提交
992 993 994

  // decompressed with sz
  return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
995
}
Y
yihaoDeng 已提交
996
#endif