tsdbDiskData.c 17.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20
typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SDiskCol         SDiskCol;
typedef struct SDiskData        SDiskData;
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24 25
struct SDiskCol {
  int16_t      cid;
  int8_t       type;
  int8_t       flag;
H
Hongze Cheng 已提交
26
  uint8_t      cmprAlg;
H
Hongze Cheng 已提交
27
  int32_t      nVal;
H
Hongze Cheng 已提交
28
  uint8_t     *pBitMap;
H
Hongze Cheng 已提交
29
  int32_t      offset;
H
Hongze Cheng 已提交
30 31 32 33 34
  SCompressor *pOffC;
  SCompressor *pValC;
};

// SDiskCol ================================================
H
Hongze Cheng 已提交
35
static int32_t tDiskColInit(SDiskCol *pDiskCol, int16_t cid, int8_t type, uint8_t cmprAlg) {
H
Hongze Cheng 已提交
36 37 38 39 40
  int32_t code = 0;

  pDiskCol->cid = cid;
  pDiskCol->type = type;
  pDiskCol->flag = 0;
H
Hongze Cheng 已提交
41
  pDiskCol->cmprAlg = cmprAlg;
H
Hongze Cheng 已提交
42
  pDiskCol->nVal = 0;
H
Hongze Cheng 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
  pDiskCol->offset = 0;

  if (IS_VAR_DATA_TYPE(type)) {
    if (pDiskCol->pOffC == NULL) {
      code = tCompressorCreate(&pDiskCol->pOffC);
      if (code) return code;
    }
    code = tCompressorReset(pDiskCol->pOffC, TSDB_DATA_TYPE_INT, cmprAlg);
    if (code) return code;
  }

  if (pDiskCol->pValC == NULL) {
    code = tCompressorCreate(&pDiskCol->pValC);
    if (code) return code;
  }
  code = tCompressorReset(pDiskCol->pValC, type, cmprAlg);
  if (code) return code;

  return code;
}

static int32_t tDiskColClear(SDiskCol *pDiskCol) {
  int32_t code = 0;
H
Hongze Cheng 已提交
66

H
Hongze Cheng 已提交
67 68 69
  tFree(pDiskCol->pBitMap);
  if (pDiskCol->pOffC) tCompressorDestroy(pDiskCol->pOffC);
  if (pDiskCol->pValC) tCompressorDestroy(pDiskCol->pValC);
H
Hongze Cheng 已提交
70 71 72 73

  return code;
}

H
Hongze Cheng 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
static int32_t tDiskColToBinary(SDiskCol *pDiskCol, const uint8_t **ppData, int32_t *nData) {
  int32_t code = 0;

  ASSERT(pDiskCol->flag && pDiskCol->flag != HAS_NONE);

  if (pDiskCol->flag == HAS_NULL) {
    return code;
  }

  // bitmap (todo)
  if (pDiskCol->flag != HAS_VALUE) {
  }

  // offset (todo)
  if (IS_VAR_DATA_TYPE(pDiskCol->type)) {
    code = tCompGen(pDiskCol->pOffC, NULL /* todo */, NULL /* todo */);
    if (code) return code;
  }

  // value (todo)
  if (pDiskCol->flag != (HAS_NULL | HAS_NONE)) {
    code = tCompGen(pDiskCol->pValC, NULL /* todo */, NULL /* todo */);
    if (code) return code;
  }

  return code;
}

H
Hongze Cheng 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
static int32_t tDiskColAddValue(SDiskCol *pDiskCol, SColVal *pColVal) {
  int32_t code = 0;

  if (IS_VAR_DATA_TYPE(pColVal->type)) {
    code = tCompress(pDiskCol->pOffC, &pDiskCol->offset, sizeof(int32_t));
    if (code) goto _exit;
    pDiskCol->offset += pColVal->value.nData;
  }
  code = tCompress(pDiskCol->pValC, pColVal->value.pData, pColVal->value.nData /*TODO*/);
  if (code) goto _exit;

_exit:
  return code;
}
static int32_t tDiskColAddVal0(SDiskCol *pDiskCol, SColVal *pColVal) {  // 0
H
Hongze Cheng 已提交
117 118 119 120 121 122 123 124
  int32_t code = 0;

  if (pColVal->isNone) {
    pDiskCol->flag = HAS_NONE;
  } else if (pColVal->isNull) {
    pDiskCol->flag = HAS_NULL;
  } else {
    pDiskCol->flag = HAS_VALUE;
H
Hongze Cheng 已提交
125
    code = tDiskColAddValue(pDiskCol, pColVal);
H
Hongze Cheng 已提交
126 127 128 129 130 131 132
    if (code) goto _exit;
  }
  pDiskCol->nVal++;

_exit:
  return code;
}
H
Hongze Cheng 已提交
133
static int32_t tDiskColAddVal1(SDiskCol *pDiskCol, SColVal *pColVal) {  // HAS_NONE
H
Hongze Cheng 已提交
134 135 136
  int32_t code = 0;

  if (!pColVal->isNone) {
H
Hongze Cheng 已提交
137
    // bit map
H
Hongze Cheng 已提交
138 139 140 141 142 143 144
    int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1);

    code = tRealloc(&pDiskCol->pBitMap, nBit);
    if (code) goto _exit;

    memset(pDiskCol->pBitMap, 0, nBit);
    SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1);
H
Hongze Cheng 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160

    // value
    if (pColVal->isNull) {
      pDiskCol->flag |= HAS_NULL;
    } else {
      pDiskCol->flag |= HAS_VALUE;

      SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
      for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) {
        code = tDiskColAddValue(pDiskCol, &cv);
        if (code) goto _exit;
      }

      code = tDiskColAddValue(pDiskCol, pColVal);
      if (code) goto _exit;
    }
H
Hongze Cheng 已提交
161 162 163 164 165 166
  }
  pDiskCol->nVal++;

_exit:
  return code;
}
H
Hongze Cheng 已提交
167
static int32_t tDiskColAddVal2(SDiskCol *pDiskCol, SColVal *pColVal) {  // HAS_NULL
H
Hongze Cheng 已提交
168 169 170
  int32_t code = 0;

  if (!pColVal->isNull) {
H
Hongze Cheng 已提交
171 172 173 174
    int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1);
    code = tRealloc(&pDiskCol->pBitMap, nBit);
    if (code) goto _exit;

H
Hongze Cheng 已提交
175 176 177
    if (pColVal->isNone) {
      pDiskCol->flag |= HAS_NONE;

H
Hongze Cheng 已提交
178 179
      memset(pDiskCol->pBitMap, 255, nBit);
      SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0);
H
Hongze Cheng 已提交
180 181 182
    } else {
      pDiskCol->flag |= HAS_VALUE;

H
Hongze Cheng 已提交
183 184
      memset(pDiskCol->pBitMap, 0, nBit);
      SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1);
H
Hongze Cheng 已提交
185 186 187 188 189 190 191 192 193 194

      SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
      for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) {
        code = tDiskColAddValue(pDiskCol, &cv);
        if (code) goto _exit;
      }

      code = tDiskColAddValue(pDiskCol, pColVal);
      if (code) goto _exit;
    }
H
Hongze Cheng 已提交
195 196 197 198 199 200
  }
  pDiskCol->nVal++;

_exit:
  return code;
}
H
Hongze Cheng 已提交
201
static int32_t tDiskColAddVal3(SDiskCol *pDiskCol, SColVal *pColVal) {  // HAS_NULL|HAS_NONE
H
Hongze Cheng 已提交
202 203 204
  int32_t code = 0;

  if (pColVal->isNone) {
H
Hongze Cheng 已提交
205 206 207 208
    code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1));
    if (code) goto _exit;

    SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0);
H
Hongze Cheng 已提交
209
  } else if (pColVal->isNull) {
H
Hongze Cheng 已提交
210 211 212 213
    code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1));
    if (code) goto _exit;

    SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1);
H
Hongze Cheng 已提交
214
  } else {
H
Hongze Cheng 已提交
215 216
    pDiskCol->flag |= HAS_VALUE;

H
Hongze Cheng 已提交
217 218 219 220 221 222 223 224 225 226 227
    uint8_t *pBitMap = NULL;
    code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1));
    if (code) goto _exit;

    for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) {
      SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal));
    }
    SET_BIT2(pBitMap, pDiskCol->nVal, 2);

    tFree(pDiskCol->pBitMap);
    pDiskCol->pBitMap = pBitMap;
H
Hongze Cheng 已提交
228 229 230 231 232 233 234 235 236

    SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
    for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) {
      code = tDiskColAddValue(pDiskCol, &cv);
      if (code) goto _exit;
    }

    code = tDiskColAddValue(pDiskCol, pColVal);
    if (code) goto _exit;
H
Hongze Cheng 已提交
237 238 239
  }
  pDiskCol->nVal++;

H
Hongze Cheng 已提交
240
_exit:
H
Hongze Cheng 已提交
241 242
  return code;
}
H
Hongze Cheng 已提交
243
static int32_t tDiskColAddVal4(SDiskCol *pDiskCol, SColVal *pColVal) {  // HAS_VALUE
H
Hongze Cheng 已提交
244 245 246
  int32_t code = 0;

  if (pColVal->isNone || pColVal->isNull) {
H
Hongze Cheng 已提交
247 248 249 250 251 252
    if (pColVal->isNone) {
      pDiskCol->flag |= HAS_NONE;
    } else {
      pDiskCol->flag |= HAS_NULL;
    }

H
Hongze Cheng 已提交
253 254 255 256 257 258
    int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1);
    code = tRealloc(&pDiskCol->pBitMap, nBit);
    if (code) goto _exit;

    memset(pDiskCol->pBitMap, 255, nBit);
    SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0);
H
Hongze Cheng 已提交
259 260 261

    code = tDiskColAddValue(pDiskCol, pColVal);
    if (code) goto _exit;
H
Hongze Cheng 已提交
262
  } else {
H
Hongze Cheng 已提交
263 264
    code = tDiskColAddValue(pDiskCol, pColVal);
    if (code) goto _exit;
H
Hongze Cheng 已提交
265 266 267 268 269 270
  }
  pDiskCol->nVal++;

_exit:
  return code;
}
H
Hongze Cheng 已提交
271
static int32_t tDiskColAddVal5(SDiskCol *pDiskCol, SColVal *pColVal) {  // HAS_VALUE|HAS_NONE
H
Hongze Cheng 已提交
272 273 274
  int32_t code = 0;

  if (pColVal->isNull) {
H
Hongze Cheng 已提交
275 276
    pDiskCol->flag |= HAS_NULL;

H
Hongze Cheng 已提交
277 278 279 280 281 282 283 284 285 286 287
    uint8_t *pBitMap = NULL;
    code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1));
    if (code) goto _exit;

    for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) {
      SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal) ? 2 : 0);
    }
    SET_BIT2(pBitMap, pDiskCol->nVal, 1);

    tFree(pDiskCol->pBitMap);
    pDiskCol->pBitMap = pBitMap;
H
Hongze Cheng 已提交
288
  } else {
H
Hongze Cheng 已提交
289 290 291
    code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1));
    if (code) goto _exit;

H
Hongze Cheng 已提交
292
    if (pColVal->isNone) {
H
Hongze Cheng 已提交
293
      SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0);
H
Hongze Cheng 已提交
294
    } else {
H
Hongze Cheng 已提交
295
      SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1);
H
Hongze Cheng 已提交
296
    }
H
Hongze Cheng 已提交
297
  }
H
Hongze Cheng 已提交
298 299
  code = tDiskColAddValue(pDiskCol, pColVal);
  if (code) goto _exit;
H
Hongze Cheng 已提交
300 301
  pDiskCol->nVal++;

H
Hongze Cheng 已提交
302
_exit:
H
Hongze Cheng 已提交
303 304
  return code;
}
H
Hongze Cheng 已提交
305
static int32_t tDiskColAddVal6(SDiskCol *pDiskCol, SColVal *pColVal) {  // HAS_VALUE|HAS_NULL
H
Hongze Cheng 已提交
306 307 308
  int32_t code = 0;

  if (pColVal->isNone) {
H
Hongze Cheng 已提交
309
    pDiskCol->flag |= HAS_NONE;
H
Hongze Cheng 已提交
310 311 312 313 314 315 316 317 318 319 320 321

    uint8_t *pBitMap = NULL;
    code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1));
    if (code) goto _exit;

    for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) {
      SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal) ? 2 : 1);
    }
    SET_BIT2(pBitMap, pDiskCol->nVal, 0);

    tFree(pDiskCol->pBitMap);
    pDiskCol->pBitMap = pBitMap;
H
Hongze Cheng 已提交
322
  } else {
H
Hongze Cheng 已提交
323 324 325
    code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1));
    if (code) goto _exit;

H
Hongze Cheng 已提交
326
    if (pColVal->isNull) {
H
Hongze Cheng 已提交
327
      SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0);
H
Hongze Cheng 已提交
328
    } else {
H
Hongze Cheng 已提交
329
      SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1);
H
Hongze Cheng 已提交
330
    }
H
Hongze Cheng 已提交
331
  }
H
Hongze Cheng 已提交
332 333
  code = tDiskColAddValue(pDiskCol, pColVal);
  if (code) goto _exit;
H
Hongze Cheng 已提交
334 335
  pDiskCol->nVal++;

H
Hongze Cheng 已提交
336
_exit:
H
Hongze Cheng 已提交
337 338
  return code;
}
H
Hongze Cheng 已提交
339
static int32_t tDiskColAddVal7(SDiskCol *pDiskCol, SColVal *pColVal) {  // HAS_VALUE|HAS_NULL|HAS_NONE
H
Hongze Cheng 已提交
340 341
  int32_t code = 0;

H
Hongze Cheng 已提交
342 343 344
  code = tRealloc(&pDiskCol->pBitMap, BIT2_SIZE(pDiskCol->nVal + 1));
  if (code) goto _exit;

H
Hongze Cheng 已提交
345
  if (pColVal->isNone) {
H
Hongze Cheng 已提交
346
    SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 0);
H
Hongze Cheng 已提交
347
  } else if (pColVal->isNull) {
H
Hongze Cheng 已提交
348
    SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 1);
H
Hongze Cheng 已提交
349
  } else {
H
Hongze Cheng 已提交
350
    SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 2);
H
Hongze Cheng 已提交
351
  }
H
Hongze Cheng 已提交
352 353
  code = tDiskColAddValue(pDiskCol, pColVal);
  if (code) goto _exit;
H
Hongze Cheng 已提交
354 355
  pDiskCol->nVal++;

H
Hongze Cheng 已提交
356
_exit:
H
Hongze Cheng 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369
  return code;
}
static int32_t (*tDiskColAddValImpl[])(SDiskCol *pDiskCol, SColVal *pColVal) = {
    tDiskColAddVal0,  // 0
    tDiskColAddVal1,  // HAS_NONE
    tDiskColAddVal2,  // HAS_NULL
    tDiskColAddVal3,  // HAS_NULL|HAS_NONE
    tDiskColAddVal4,  // HAS_VALUE
    tDiskColAddVal5,  // HAS_VALUE|HAS_NONE
    tDiskColAddVal6,  // HAS_VALUE|HAS_NULL
    tDiskColAddVal7,  // HAS_VALUE|HAS_NULL|HAS_NONE
};

H
Hongze Cheng 已提交
370 371
// SDiskDataBuilder ================================================
struct SDiskDataBuilder {
H
Hongze Cheng 已提交
372 373
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
374
  int32_t      nRow;
H
Hongze Cheng 已提交
375 376 377 378 379 380
  uint8_t      cmprAlg;
  SCompressor *pUidC;
  SCompressor *pVerC;
  SCompressor *pKeyC;
  int32_t      nDiskCol;
  SArray      *aDiskCol;
H
Hongze Cheng 已提交
381
  uint8_t     *aBuf[2];
H
Hongze Cheng 已提交
382 383
};

H
Hongze Cheng 已提交
384
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) {
H
Hongze Cheng 已提交
385
  int32_t code = 0;
H
Hongze Cheng 已提交
386

H
Hongze Cheng 已提交
387 388 389 390
  pBuilder->suid = pId->suid;
  pBuilder->uid = pId->uid;
  pBuilder->nRow = 0;
  pBuilder->cmprAlg = cmprAlg;
H
Hongze Cheng 已提交
391

H
Hongze Cheng 已提交
392 393
  if (pBuilder->pUidC == NULL) {
    code = tCompressorCreate(&pBuilder->pUidC);
H
Hongze Cheng 已提交
394 395
    if (code) return code;
  }
H
Hongze Cheng 已提交
396
  code = tCompressorReset(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
H
Hongze Cheng 已提交
397 398
  if (code) return code;

H
Hongze Cheng 已提交
399 400
  if (pBuilder->pVerC == NULL) {
    code = tCompressorCreate(&pBuilder->pVerC);
H
Hongze Cheng 已提交
401 402
    if (code) return code;
  }
H
Hongze Cheng 已提交
403
  code = tCompressorReset(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
H
Hongze Cheng 已提交
404 405
  if (code) return code;

H
Hongze Cheng 已提交
406 407
  if (pBuilder->pKeyC == NULL) {
    code = tCompressorCreate(&pBuilder->pKeyC);
H
Hongze Cheng 已提交
408 409
    if (code) return code;
  }
H
Hongze Cheng 已提交
410
  code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
H
Hongze Cheng 已提交
411 412
  if (code) return code;

H
Hongze Cheng 已提交
413 414 415
  if (pBuilder->aDiskCol == NULL) {
    pBuilder->aDiskCol = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskCol));
    if (pBuilder->aDiskCol == NULL) {
H
Hongze Cheng 已提交
416 417 418 419 420
      code = TSDB_CODE_OUT_OF_MEMORY;
      return code;
    }
  }

H
Hongze Cheng 已提交
421
  pBuilder->nDiskCol = 0;
H
Hongze Cheng 已提交
422 423 424
  for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
    STColumn *pTColumn = &pTSchema->columns[iCol];

H
Hongze Cheng 已提交
425
    if (pBuilder->nDiskCol >= taosArrayGetSize(pBuilder->aDiskCol)) {
H
Hongze Cheng 已提交
426
      SDiskCol dc = (SDiskCol){0};
H
Hongze Cheng 已提交
427
      if (taosArrayPush(pBuilder->aDiskCol, &dc) == NULL) {
H
Hongze Cheng 已提交
428 429 430 431 432
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }
    }

H
Hongze Cheng 已提交
433
    SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, pBuilder->nDiskCol);
H
Hongze Cheng 已提交
434 435 436 437

    code = tDiskColInit(pDiskCol, pTColumn->colId, pTColumn->type, cmprAlg);
    if (code) return code;

H
Hongze Cheng 已提交
438
    pBuilder->nDiskCol++;
H
Hongze Cheng 已提交
439 440
  }

H
Hongze Cheng 已提交
441 442 443
  return code;
}

H
Hongze Cheng 已提交
444
int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) {
H
Hongze Cheng 已提交
445 446
  int32_t code = 0;

H
Hongze Cheng 已提交
447 448 449
  if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC);
  if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC);
  if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC);
H
Hongze Cheng 已提交
450

H
Hongze Cheng 已提交
451 452 453
  if (pBuilder->aDiskCol) {
    for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pBuilder->aDiskCol); iDiskCol++) {
      SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
H
Hongze Cheng 已提交
454 455
      tDiskColClear(pDiskCol);
    }
H
Hongze Cheng 已提交
456
    taosArrayDestroy(pBuilder->aDiskCol);
H
Hongze Cheng 已提交
457
  }
H
Hongze Cheng 已提交
458 459
  for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
    tFree(pBuilder->aBuf[iBuf]);
H
Hongze Cheng 已提交
460
  }
H
Hongze Cheng 已提交
461 462 463 464

  return code;
}

H
Hongze Cheng 已提交
465
int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) {
H
Hongze Cheng 已提交
466 467
  int32_t code = 0;

H
Hongze Cheng 已提交
468
  ASSERT(pId->suid == pBuilder->suid);
H
Hongze Cheng 已提交
469 470

  // uid
H
Hongze Cheng 已提交
471 472 473
  if (pBuilder->uid && pBuilder->uid != pId->uid) {
    for (int32_t iRow = 0; iRow < pBuilder->nRow; iRow++) {
      code = tCompress(pBuilder->pUidC, &pBuilder->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
474 475
      if (code) goto _exit;
    }
H
Hongze Cheng 已提交
476
    pBuilder->uid = 0;
H
Hongze Cheng 已提交
477
  }
H
Hongze Cheng 已提交
478 479
  if (pBuilder->uid == 0) {
    code = tCompress(pBuilder->pUidC, &pId->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
480 481
    if (code) goto _exit;
  }
H
Hongze Cheng 已提交
482 483 484

  // version
  int64_t version = TSDBROW_VERSION(pRow);
H
Hongze Cheng 已提交
485
  code = tCompress(pBuilder->pVerC, &version, sizeof(int64_t));
H
Hongze Cheng 已提交
486 487 488 489
  if (code) goto _exit;

  // TSKEY
  TSKEY ts = TSDBROW_TS(pRow);
H
Hongze Cheng 已提交
490
  code = tCompress(pBuilder->pKeyC, &ts, sizeof(int64_t));
H
Hongze Cheng 已提交
491 492 493 494 495 496
  if (code) goto _exit;

  SRowIter iter = {0};
  tRowIterInit(&iter, pRow, pTSchema);

  SColVal *pColVal = tRowIterNext(&iter);
H
Hongze Cheng 已提交
497 498
  for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
    SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
H
Hongze Cheng 已提交
499 500 501 502 503 504

    while (pColVal && pColVal->cid < pDiskCol->cid) {
      pColVal = tRowIterNext(&iter);
    }

    if (pColVal == NULL || pColVal->cid > pDiskCol->cid) {
H
Hongze Cheng 已提交
505 506
      SColVal cv = COL_VAL_NONE(pDiskCol->cid, pDiskCol->type);
      code = tDiskColAddValImpl[pDiskCol->flag](pDiskCol, &cv);
H
Hongze Cheng 已提交
507 508 509 510 511 512 513
      if (code) goto _exit;
    } else {
      code = tDiskColAddValImpl[pDiskCol->flag](pDiskCol, pColVal);
      if (code) goto _exit;
      pColVal = tRowIterNext(&iter);
    }
  }
H
Hongze Cheng 已提交
514
  pBuilder->nRow++;
H
Hongze Cheng 已提交
515 516 517 518

_exit:
  return code;
}
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
H
Hongze Cheng 已提交
521 522
  int32_t code = 0;

H
Hongze Cheng 已提交
523
  ASSERT(pBuilder->nRow);
H
Hongze Cheng 已提交
524 525 526

  SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT,
                      .fmtVer = 0,
H
Hongze Cheng 已提交
527 528
                      .suid = pBuilder->suid,
                      .uid = pBuilder->uid,
H
Hongze Cheng 已提交
529 530 531 532
                      .szUid = 0,
                      .szVer = 0,
                      .szKey = 0,
                      .szBlkCol = 0,
H
Hongze Cheng 已提交
533 534
                      .nRow = pBuilder->nRow,
                      .cmprAlg = pBuilder->cmprAlg};
H
Hongze Cheng 已提交
535 536 537

  // UID
  const uint8_t *pUid = NULL;
H
Hongze Cheng 已提交
538 539
  if (pBuilder->uid == 0) {
    code = tCompGen(pBuilder->pUidC, &pUid, &hdr.szUid);
H
Hongze Cheng 已提交
540 541 542 543 544
    if (code) return code;
  }

  // VERSION
  const uint8_t *pVer = NULL;
H
Hongze Cheng 已提交
545
  code = tCompGen(pBuilder->pVerC, &pVer, &hdr.szVer);
H
Hongze Cheng 已提交
546 547 548 549
  if (code) return code;

  // TSKEY
  const uint8_t *pKey = NULL;
H
Hongze Cheng 已提交
550
  code = tCompGen(pBuilder->pKeyC, &pKey, &hdr.szKey);
H
Hongze Cheng 已提交
551 552 553
  if (code) return code;

  int32_t offset = 0;
H
Hongze Cheng 已提交
554 555
  for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
    SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
H
Hongze Cheng 已提交
556 557 558

    if (pDiskCol->flag == HAS_NONE) continue;

H
Hongze Cheng 已提交
559 560
    code = tDiskColToBinary(pDiskCol, NULL, NULL);
    if (code) return code;
H
Hongze Cheng 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575

    SBlockCol bCol = {.cid = pDiskCol->cid,
                      .type = pDiskCol->type,
                      // .smaOn = ,
                      .flag = pDiskCol->flag,
                      // .szOrigin =
                      // .szBitmap =
                      // .szOffset =
                      // .szValue =
                      .offset = offset};

    hdr.szBlkCol += tPutBlockCol(NULL, &bCol);
    offset = offset + bCol.szBitmap + bCol.szOffset + bCol.szValue;
  }

H
Hongze Cheng 已提交
576
#if 0
H
Hongze Cheng 已提交
577
  *nData = tPutDiskDataHdr(NULL, &hdr) + hdr.szUid + hdr.szVer + hdr.szKey + hdr.szBlkCol + offset;
H
Hongze Cheng 已提交
578
  code = tRealloc(&pBuilder->aBuf[0], *nData);
H
Hongze Cheng 已提交
579
  if (code) return code;
H
Hongze Cheng 已提交
580
  *ppData = pBuilder->aBuf[0];
H
Hongze Cheng 已提交
581 582

  int32_t n = 0;
H
Hongze Cheng 已提交
583
  n += tPutDiskDataHdr(pBuilder->aBuf[0] + n, &hdr);
H
Hongze Cheng 已提交
584
  if (hdr.szUid) {
H
Hongze Cheng 已提交
585
    memcpy(pBuilder->aBuf[0] + n, pUid, hdr.szUid);
H
Hongze Cheng 已提交
586 587
    n += hdr.szUid;
  }
H
Hongze Cheng 已提交
588
  memcpy(pBuilder->aBuf[0] + n, pVer, hdr.szVer);
H
Hongze Cheng 已提交
589
  n += hdr.szVer;
H
Hongze Cheng 已提交
590
  memcpy(pBuilder->aBuf[0] + n, pKey, hdr.szKey);
H
Hongze Cheng 已提交
591
  n += hdr.szKey;
H
Hongze Cheng 已提交
592 593 594
  for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
    SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
    n += tPutBlockCol(pBuilder->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */);
H
Hongze Cheng 已提交
595
  }
H
Hongze Cheng 已提交
596 597
  for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
    SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
H
Hongze Cheng 已提交
598 599 600
    // memcpy(pDiskData->aBuf[0] + n, NULL, );
    // n += 0;
  }
H
Hongze Cheng 已提交
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
#endif

  return code;
}

// SDiskData ================================================
struct SDiskData {
  SDiskDataHdr   hdr;
  const uint8_t *pUid;
  const uint8_t *pVer;
  const uint8_t *pKey;
  SArray        *aBlockCol;
  SArray        *aColData;
};

int32_t tDiskDataDestroy(SDiskData *pDiskData) {
  int32_t code = 0;

  if (pDiskData->aBlockCol) taosArrayDestroy(pDiskData->aBlockCol);
  if (pDiskData->aColData) taosArrayDestroy(pDiskData->aColData);
H
Hongze Cheng 已提交
621 622 623

  return code;
}