tsdbSttFileRW.c 32.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbSttFileRW.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
typedef struct {
  int64_t   prevFooter;
H
Hongze Cheng 已提交
20 21
  SFDataPtr sttBlkPtr[1];
  SFDataPtr statisBlkPtr[1];
H
Hongze Cheng 已提交
22
  SFDataPtr tombBlkPtr[1];
H
Hongze Cheng 已提交
23 24
  SFDataPtr rsrvd[2];
} SSttFooter;
H
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26
// SSttFReader ============================================================
H
Hongze Cheng 已提交
27
struct SSttFileReader {
H
Hongze Cheng 已提交
28
  SSttFileReaderConfig config[1];
H
Hongze Cheng 已提交
29
  TSttSegReaderArray   readerArray[1];
H
Hongze Cheng 已提交
30
  STsdbFD             *fd;
H
Hongze Cheng 已提交
31
  uint8_t             *bufArr[5];
H
Hongze Cheng 已提交
32 33 34 35
};

struct SSttSegReader {
  SSttFileReader *reader;
H
Hongze Cheng 已提交
36
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
37 38 39
  struct {
    bool sttBlkLoaded;
    bool statisBlkLoaded;
H
Hongze Cheng 已提交
40
    bool tombBlkLoaded;
H
Hongze Cheng 已提交
41
  } ctx[1];
H
Hongze Cheng 已提交
42 43
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
44
  TTombBlkArray   tombBlkArray[1];
H
Hongze Cheng 已提交
45 46
};

H
Hongze Cheng 已提交
47
// SSttFileReader
H
Hongze Cheng 已提交
48
static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
H
Hongze Cheng 已提交
49 50
  ASSERT(offset >= TSDB_FHDR_SIZE);

H
Hongze Cheng 已提交
51
  int32_t code = 0;
H
Hongze Cheng 已提交
52 53 54
  int32_t lino = 0;

  segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
H
Hongze Cheng 已提交
55
  if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
56 57

  segReader[0]->reader = reader;
H
Hongze Cheng 已提交
58
  code = tsdbReadFile(reader->fd, offset, (uint8_t *)(segReader[0]->footer), sizeof(SSttFooter));
H
Hongze Cheng 已提交
59 60 61 62
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
63
    TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
64 65 66
    taosMemoryFree(segReader[0]);
    segReader[0] = NULL;
  }
H
Hongze Cheng 已提交
67 68 69
  return code;
}

H
Hongze Cheng 已提交
70
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
H
Hongze Cheng 已提交
71
  if (reader[0]) {
H
Hongze Cheng 已提交
72 73
    TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
    TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL);
H
Hongze Cheng 已提交
74
    TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL);
H
Hongze Cheng 已提交
75 76
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
H
Hongze Cheng 已提交
77 78
  }
  return 0;
H
Hongze Cheng 已提交
79 80
}

H
Hongze Cheng 已提交
81
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
H
Hongze Cheng 已提交
82
  int32_t code = 0;
H
Hongze Cheng 已提交
83 84 85 86 87
  int32_t lino = 0;

  reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
  if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
88
  reader[0]->config[0] = config[0];
H
Hongze Cheng 已提交
89 90 91
  if (reader[0]->config->bufArr == NULL) {
    reader[0]->config->bufArr = reader[0]->bufArr;
  }
H
Hongze Cheng 已提交
92 93

  // open file
H
Hongze Cheng 已提交
94 95 96 97 98 99 100 101 102
  if (fname) {
    code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    char fname1[TSDB_FILENAME_LEN];
    tsdbTFileName(config->tsdb, config->file, fname1);
    code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
103 104

  // open each segment reader
H
Hongze Cheng 已提交
105
  int64_t size = config->file->size;
H
Hongze Cheng 已提交
106
  while (size > 0) {
H
Hongze Cheng 已提交
107
    SSttSegReader *reader1;
H
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109
    code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &reader1);
H
Hongze Cheng 已提交
110 111
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
112
    code = TARRAY2_APPEND(reader[0]->readerArray, reader1);
H
Hongze Cheng 已提交
113 114
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
115
    size = reader1->footer->prevFooter;
H
Hongze Cheng 已提交
116 117
  }

H
Hongze Cheng 已提交
118
  ASSERT(TARRAY2_SIZE(reader[0]->readerArray) == config->file->stt->nseg);
H
Hongze Cheng 已提交
119 120 121

_exit:
  if (code) {
H
Hongze Cheng 已提交
122
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
123
    tsdbSttFileReaderClose(reader);
H
Hongze Cheng 已提交
124
  }
H
Hongze Cheng 已提交
125 126 127
  return code;
}

H
Hongze Cheng 已提交
128
int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
H
Hongze Cheng 已提交
129
  if (reader[0]) {
H
Hongze Cheng 已提交
130 131 132
    for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) {
      tFree(reader[0]->bufArr[i]);
    }
H
Hongze Cheng 已提交
133
    tsdbCloseFile(&reader[0]->fd);
H
Hongze Cheng 已提交
134
    TARRAY2_DESTROY(reader[0]->readerArray, tsdbSttSegReaderClose);
H
Hongze Cheng 已提交
135 136 137
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
  }
H
Hongze Cheng 已提交
138
  return 0;
H
Hongze Cheng 已提交
139 140
}

H
Hongze Cheng 已提交
141
int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray) {
H
Hongze Cheng 已提交
142
  readerArray[0] = reader->readerArray;
H
Hongze Cheng 已提交
143
  return 0;
H
Hongze Cheng 已提交
144 145
}

H
Hongze Cheng 已提交
146
// SSttFSegReader
H
Hongze Cheng 已提交
147
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
H
Hongze Cheng 已提交
148
  if (!reader->ctx->statisBlkLoaded) {
H
Hongze Cheng 已提交
149
    if (reader->footer->statisBlkPtr->size > 0) {
H
Hongze Cheng 已提交
150
      ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
H
Hongze Cheng 已提交
151

H
Hongze Cheng 已提交
152
      int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk);
H
Hongze Cheng 已提交
153
      void   *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
154 155
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
156 157
      int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data,
                                  reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
158 159 160 161
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
162

H
Hongze Cheng 已提交
163
      TARRAY2_INIT_EX(reader->statisBlkArray, size, size, data);
H
Hongze Cheng 已提交
164
    } else {
H
Hongze Cheng 已提交
165
      TARRAY2_INIT(reader->statisBlkArray);
H
Hongze Cheng 已提交
166 167
    }

H
Hongze Cheng 已提交
168
    reader->ctx->statisBlkLoaded = true;
H
Hongze Cheng 已提交
169 170
  }

H
Hongze Cheng 已提交
171
  statisBlkArray[0] = reader->statisBlkArray;
H
Hongze Cheng 已提交
172
  return 0;
H
Hongze Cheng 已提交
173 174
}

H
Hongze Cheng 已提交
175 176 177 178
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tombBlkArray) {
  if (!reader->ctx->tombBlkLoaded) {
    if (reader->footer->tombBlkPtr->size > 0) {
      ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
H
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180 181
      int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
      void   *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
182 183
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
184
      int32_t code =
H
Hongze Cheng 已提交
185
          tsdbReadFile(reader->reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
186 187 188 189
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
190

H
Hongze Cheng 已提交
191
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
H
Hongze Cheng 已提交
192
    } else {
H
Hongze Cheng 已提交
193
      TARRAY2_INIT(reader->tombBlkArray);
H
Hongze Cheng 已提交
194 195
    }

H
Hongze Cheng 已提交
196
    reader->ctx->tombBlkLoaded = true;
H
Hongze Cheng 已提交
197 198
  }

H
Hongze Cheng 已提交
199
  tombBlkArray[0] = reader->tombBlkArray;
H
Hongze Cheng 已提交
200 201 202
  return 0;
}

H
Hongze Cheng 已提交
203
int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
H
Hongze Cheng 已提交
204
  if (!reader->ctx->sttBlkLoaded) {
H
Hongze Cheng 已提交
205 206
    if (reader->footer->sttBlkPtr->size > 0) {
      ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
H
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208 209
      int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
      void   *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
210 211
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
212 213
      int32_t code =
          tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
214 215 216 217
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
218

H
Hongze Cheng 已提交
219
      TARRAY2_INIT_EX(reader->sttBlkArray, size, size, data);
H
Hongze Cheng 已提交
220
    } else {
H
Hongze Cheng 已提交
221
      TARRAY2_INIT(reader->sttBlkArray);
H
Hongze Cheng 已提交
222 223
    }

H
Hongze Cheng 已提交
224
    reader->ctx->sttBlkLoaded = true;
H
Hongze Cheng 已提交
225 226
  }

H
Hongze Cheng 已提交
227
  sttBlkArray[0] = reader->sttBlkArray;
H
Hongze Cheng 已提交
228
  return 0;
H
Hongze Cheng 已提交
229 230
}

H
Hongze Cheng 已提交
231
int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
H
Hongze Cheng 已提交
232
  int32_t code = 0;
H
Hongze Cheng 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
  int32_t lino = 0;

  code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code =
      tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tDecmprBlockData(reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData,
                          &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
  }
H
Hongze Cheng 已提交
250 251 252
  return code;
}

H
Hongze Cheng 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
                                         STSchema *pTSchema, int16_t cids[], int32_t ncid) {
  int32_t code = 0;
  int32_t lino = 0;

  TABLEID tbid = {.suid = sttBlk->suid, .uid = 0};
  code = tBlockDataInit(bData, &tbid, pTSchema, cids, ncid);
  TSDB_CHECK_CODE(code, lino, _exit);

  // uid + version + tskey
  code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
  TSDB_CHECK_CODE(code, lino, _exit);

  // hdr
  SDiskDataHdr hdr[1];
  int32_t      size = 0;

  size += tGetDiskDataHdr(reader->reader->config->bufArr[0] + size, hdr);

  ASSERT(hdr->delimiter == TSDB_FILE_DLMT);

  bData->nRow = hdr->nRow;
  bData->uid = hdr->uid;

  // uid
  if (hdr->uid == 0) {
    ASSERT(hdr->szUid);
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
                          (uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    ASSERT(hdr->szUid == 0);
  }
  size += hdr->szUid;

  // version
  code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
                        (uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);
  size += hdr->szVer;

  // ts
  code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg,
                        (uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);
  size += hdr->szKey;

  ASSERT(size == sttBlk->bInfo.szKey);

  // other columns
  if (bData->nColData > 0) {
    if (hdr->szBlkCol > 0) {
      code = tRealloc(&reader->reader->config->bufArr[0], hdr->szBlkCol);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey,
                          reader->reader->config->bufArr[0], hdr->szBlkCol);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    SBlockCol  bc[1] = {{.cid = 0}};
    SBlockCol *blockCol = bc;

    size = 0;
    for (int32_t i = 0; i < bData->nColData; i++) {
      SColData *colData = tBlockDataGetColDataByIdx(bData, i);

      while (blockCol && blockCol->cid < colData->cid) {
        if (size < hdr->szBlkCol) {
          size += tGetBlockCol(reader->reader->config->bufArr[0] + size, blockCol);
        } else {
          ASSERT(size == hdr->szBlkCol);
          blockCol = NULL;
        }
      }

      if (blockCol == NULL || blockCol->cid > colData->cid) {
        for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
          code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type));
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      } else {
        ASSERT(blockCol->type == colData->type);
        ASSERT(blockCol->flag && blockCol->flag != HAS_NONE);

        if (blockCol->flag == HAS_NULL) {
          for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
            code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type));
            TSDB_CHECK_CODE(code, lino, _exit);
          }
        } else {
          int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;

          code = tRealloc(&reader->reader->config->bufArr[1], size1);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbReadFile(reader->reader->fd,
                              sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
                              reader->reader->config->bufArr[1], size1);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbDecmprColData(reader->reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
                                   &reader->reader->config->bufArr[2]);
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      }
    }
  }

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
  }
  return code;
}

H
Hongze Cheng 已提交
372
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *tombBlock) {
H
Hongze Cheng 已提交
373
  int32_t code = 0;
H
Hongze Cheng 已提交
374 375
  int32_t lino = 0;

H
Hongze Cheng 已提交
376
  code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size);
H
Hongze Cheng 已提交
377 378
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
379
  code = tsdbReadFile(reader->reader->fd, tombBlk->dp->offset, reader->reader->config->bufArr[0], tombBlk->dp->size);
H
Hongze Cheng 已提交
380 381 382
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
H
Hongze Cheng 已提交
383 384
  tTombBlockClear(tombBlock);
  for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); ++i) {
H
Hongze Cheng 已提交
385
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT,
H
Hongze Cheng 已提交
386
                          tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
H
Hongze Cheng 已提交
387
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
388 389
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
390
    code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], reader->reader->config->bufArr[1], tombBlk->numRec);
H
Hongze Cheng 已提交
391
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
392

H
Hongze Cheng 已提交
393
    size += tombBlk->size[i];
H
Hongze Cheng 已提交
394 395
  }

H
Hongze Cheng 已提交
396
  ASSERT(size == tombBlk->dp->size);
H
Hongze Cheng 已提交
397 398
_exit:
  if (code) {
H
Hongze Cheng 已提交
399
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
400
  }
H
Hongze Cheng 已提交
401 402 403
  return code;
}

H
Hongze Cheng 已提交
404
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) {
H
Hongze Cheng 已提交
405
  int32_t code = 0;
H
Hongze Cheng 已提交
406 407
  int32_t lino = 0;

H
Hongze Cheng 已提交
408
  code = tRealloc(&reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
409 410
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
411 412
  code =
      tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
413
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
414 415

  int64_t size = 0;
H
Hongze Cheng 已提交
416 417
  tStatisBlockClear(statisBlock);
  for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) {
H
Hongze Cheng 已提交
418
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
H
Hongze Cheng 已提交
419
                          statisBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec,
H
Hongze Cheng 已提交
420
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
421 422
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
423
    code = TARRAY2_APPEND_BATCH(statisBlock->dataArr + i, reader->reader->config->bufArr[1], statisBlk->numRec);
H
Hongze Cheng 已提交
424
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
425

H
Hongze Cheng 已提交
426 427 428 429 430 431 432
    size += statisBlk->size[i];
  }

  ASSERT(size == statisBlk->dp->size);

_exit:
  if (code) {
H
Hongze Cheng 已提交
433
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
434
  }
H
Hongze Cheng 已提交
435 436 437
  return code;
}

H
Hongze Cheng 已提交
438
// SSttFWriter ============================================================
H
Hongze Cheng 已提交
439
struct SSttFileWriter {
H
Hongze Cheng 已提交
440
  SSttFileWriterConfig config[1];
H
Hongze Cheng 已提交
441
  struct {
H
Hongze Cheng 已提交
442 443
    bool    opened;
    TABLEID tbid[1];
H
Hongze Cheng 已提交
444
  } ctx[1];
H
Hongze Cheng 已提交
445
  // file
H
Hongze Cheng 已提交
446 447
  STsdbFD *fd;
  STFile   file[1];
H
Hongze Cheng 已提交
448
  // data
H
Hongze Cheng 已提交
449 450
  SSttFooter      footer[1];
  TTombBlkArray   tombBlkArray[1];
H
Hongze Cheng 已提交
451 452
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
453 454 455
  STombBlock      tombBlock[1];
  STbStatisBlock  staticBlock[1];
  SBlockData      blockData[1];
H
Hongze Cheng 已提交
456
  // helper data
H
Hongze Cheng 已提交
457 458
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
459
  uint8_t *bufArr[5];
H
Hongze Cheng 已提交
460
};
H
Hongze Cheng 已提交
461

H
Hongze Cheng 已提交
462
static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
463
  if (writer->blockData->nRow == 0) return 0;
H
Hongze Cheng 已提交
464

H
Hongze Cheng 已提交
465
  int32_t code = 0;
H
Hongze Cheng 已提交
466
  int32_t lino = 0;
H
Hongze Cheng 已提交
467 468

  SSttBlk sttBlk[1] = {{
H
Hongze Cheng 已提交
469 470 471 472 473 474 475 476
      .suid = writer->blockData->suid,
      .minUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[0],
      .maxUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[writer->blockData->nRow - 1],
      .minKey = writer->blockData->aTSKEY[0],
      .maxKey = writer->blockData->aTSKEY[0],
      .minVer = writer->blockData->aVersion[0],
      .maxVer = writer->blockData->aVersion[0],
      .nRow = writer->blockData->nRow,
H
Hongze Cheng 已提交
477 478
  }};

H
Hongze Cheng 已提交
479 480 481 482 483
  for (int32_t iRow = 1; iRow < writer->blockData->nRow; iRow++) {
    if (sttBlk->minKey > writer->blockData->aTSKEY[iRow]) sttBlk->minKey = writer->blockData->aTSKEY[iRow];
    if (sttBlk->maxKey < writer->blockData->aTSKEY[iRow]) sttBlk->maxKey = writer->blockData->aTSKEY[iRow];
    if (sttBlk->minVer > writer->blockData->aVersion[iRow]) sttBlk->minVer = writer->blockData->aVersion[iRow];
    if (sttBlk->maxVer < writer->blockData->aVersion[iRow]) sttBlk->maxVer = writer->blockData->aVersion[iRow];
H
Hongze Cheng 已提交
484 485
  }

H
Hongze Cheng 已提交
486
  int32_t sizeArr[5] = {0};
H
Hongze Cheng 已提交
487
  code = tCmprBlockData(writer->blockData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr);
H
Hongze Cheng 已提交
488
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
489

H
Hongze Cheng 已提交
490
  sttBlk->bInfo.offset = writer->file->size;
H
Hongze Cheng 已提交
491 492
  sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
  sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
H
Hongze Cheng 已提交
493

H
Hongze Cheng 已提交
494
  for (int32_t i = 3; i >= 0; i--) {
H
Hongze Cheng 已提交
495 496
    if (sizeArr[i]) {
      code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]);
H
Hongze Cheng 已提交
497
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
498
      writer->file->size += sizeArr[i];
H
Hongze Cheng 已提交
499 500 501
    }
  }

H
Hongze Cheng 已提交
502
  code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk);
H
Hongze Cheng 已提交
503
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
504

H
Hongze Cheng 已提交
505
  tBlockDataClear(writer->blockData);
H
Hongze Cheng 已提交
506

H
Hongze Cheng 已提交
507 508
_exit:
  if (code) {
H
Hongze Cheng 已提交
509
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
510
  }
H
Hongze Cheng 已提交
511 512 513
  return code;
}

H
Hongze Cheng 已提交
514
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
515
  if (STATIS_BLOCK_SIZE(writer->staticBlock) == 0) return 0;
H
Hongze Cheng 已提交
516

H
Hongze Cheng 已提交
517
  int32_t code = 0;
H
Hongze Cheng 已提交
518
  int32_t lino = 0;
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520
  SStatisBlk statisBlk[1] = {{
H
Hongze Cheng 已提交
521 522 523 524 525
      .dp[0] =
          {
              .offset = writer->file->size,
              .size = 0,
          },
H
Hongze Cheng 已提交
526 527
      .minTbid =
          {
H
Hongze Cheng 已提交
528 529
              .suid = TARRAY2_FIRST(writer->staticBlock->suid),
              .uid = TARRAY2_FIRST(writer->staticBlock->uid),
H
Hongze Cheng 已提交
530 531 532
          },
      .maxTbid =
          {
H
Hongze Cheng 已提交
533 534
              .suid = TARRAY2_LAST(writer->staticBlock->suid),
              .uid = TARRAY2_LAST(writer->staticBlock->uid),
H
Hongze Cheng 已提交
535
          },
H
Hongze Cheng 已提交
536 537 538
      .minVer = TARRAY2_FIRST(writer->staticBlock->minVer),
      .maxVer = TARRAY2_FIRST(writer->staticBlock->maxVer),
      .numRec = STATIS_BLOCK_SIZE(writer->staticBlock),
H
Hongze Cheng 已提交
539
      .cmprAlg = writer->config->cmprAlg,
H
Hongze Cheng 已提交
540 541
  }};

H
Hongze Cheng 已提交
542 543 544 545 546 547 548
  for (int32_t i = 1; i < STATIS_BLOCK_SIZE(writer->staticBlock); i++) {
    if (statisBlk->minVer > TARRAY2_GET(writer->staticBlock->minVer, i)) {
      statisBlk->minVer = TARRAY2_GET(writer->staticBlock->minVer, i);
    }
    if (statisBlk->maxVer < TARRAY2_GET(writer->staticBlock->maxVer, i)) {
      statisBlk->maxVer = TARRAY2_GET(writer->staticBlock->maxVer, i);
    }
H
Hongze Cheng 已提交
549
  }
H
Hongze Cheng 已提交
550

H
Hongze Cheng 已提交
551
  for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) {
H
Hongze Cheng 已提交
552 553 554
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->staticBlock->dataArr + i),
                        TARRAY2_DATA_LEN(&writer->staticBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg,
                        &writer->config->bufArr[0], 0, &statisBlk->size[i], &writer->config->bufArr[1]);
H
Hongze Cheng 已提交
555
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
556

H
Hongze Cheng 已提交
557
    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], statisBlk->size[i]);
H
Hongze Cheng 已提交
558 559
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
560 561
    statisBlk->dp->size += statisBlk->size[i];
    writer->file->size += statisBlk->size[i];
H
Hongze Cheng 已提交
562
  }
H
Hongze Cheng 已提交
563

H
Hongze Cheng 已提交
564
  code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk);
H
Hongze Cheng 已提交
565
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567
  tStatisBlockClear(writer->staticBlock);
H
Hongze Cheng 已提交
568

H
Hongze Cheng 已提交
569 570
_exit:
  if (code) {
H
Hongze Cheng 已提交
571
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
572
  }
H
Hongze Cheng 已提交
573 574 575
  return code;
}

H
Hongze Cheng 已提交
576
static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
577
  if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
H
Hongze Cheng 已提交
578

H
Hongze Cheng 已提交
579
  int32_t code = 0;
H
Hongze Cheng 已提交
580
  int32_t lino = 0;
H
Hongze Cheng 已提交
581

H
Hongze Cheng 已提交
582
  STombBlk tombBlk[1] = {{
H
Hongze Cheng 已提交
583 584 585 586 587
      .dp[0] =
          {
              .offset = writer->file->size,
              .size = 0,
          },
H
Hongze Cheng 已提交
588
      .minTbid =
H
Hongze Cheng 已提交
589
          {
H
Hongze Cheng 已提交
590 591
              .suid = TARRAY2_FIRST(writer->tombBlock->suid),
              .uid = TARRAY2_FIRST(writer->tombBlock->uid),
H
Hongze Cheng 已提交
592
          },
H
Hongze Cheng 已提交
593
      .maxTbid =
H
Hongze Cheng 已提交
594
          {
H
Hongze Cheng 已提交
595 596
              .suid = TARRAY2_LAST(writer->tombBlock->suid),
              .uid = TARRAY2_LAST(writer->tombBlock->uid),
H
Hongze Cheng 已提交
597
          },
H
Hongze Cheng 已提交
598 599 600
      .minVer = TARRAY2_FIRST(writer->tombBlock->version),
      .maxVer = TARRAY2_FIRST(writer->tombBlock->version),
      .numRec = TOMB_BLOCK_SIZE(writer->tombBlock),
H
Hongze Cheng 已提交
601
      .cmprAlg = writer->config->cmprAlg,
H
Hongze Cheng 已提交
602
  }};
H
Hongze Cheng 已提交
603

H
Hongze Cheng 已提交
604 605 606 607 608 609 610
  for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) {
    if (tombBlk->minVer > TARRAY2_GET(writer->tombBlock->version, i)) {
      tombBlk->minVer = TARRAY2_GET(writer->tombBlock->version, i);
    }
    if (tombBlk->maxVer < TARRAY2_GET(writer->tombBlock->version, i)) {
      tombBlk->maxVer = TARRAY2_GET(writer->tombBlock->version, i);
    }
H
Hongze Cheng 已提交
611 612
  }

H
Hongze Cheng 已提交
613 614 615 616
  for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) {
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]),
                        TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
                        &writer->config->bufArr[0], 0, &tombBlk->size[i], &writer->config->bufArr[1]);
H
Hongze Cheng 已提交
617
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
618

H
Hongze Cheng 已提交
619
    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], tombBlk->size[i]);
H
Hongze Cheng 已提交
620 621
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
622 623
    tombBlk->dp->size += tombBlk->size[i];
    writer->file->size += tombBlk->size[i];
H
Hongze Cheng 已提交
624 625
  }

H
Hongze Cheng 已提交
626
  code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk);
H
Hongze Cheng 已提交
627
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
628

H
Hongze Cheng 已提交
629
  tTombBlockClear(writer->tombBlock);
H
Hongze Cheng 已提交
630

H
Hongze Cheng 已提交
631 632
_exit:
  if (code) {
H
Hongze Cheng 已提交
633
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
634
  }
H
Hongze Cheng 已提交
635 636 637
  return code;
}

H
Hongze Cheng 已提交
638
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
639
  int32_t code = 0;
H
Hongze Cheng 已提交
640 641
  int32_t lino;

H
Hongze Cheng 已提交
642
  writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray);
H
Hongze Cheng 已提交
643
  if (writer->footer->sttBlkPtr->size) {
H
Hongze Cheng 已提交
644
    writer->footer->sttBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
645
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
H
Hongze Cheng 已提交
646
                         writer->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
647
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
648
    writer->file->size += writer->footer->sttBlkPtr->size;
H
Hongze Cheng 已提交
649 650 651 652
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
653
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
654
  }
H
Hongze Cheng 已提交
655 656 657
  return code;
}

H
Hongze Cheng 已提交
658
static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
659
  int32_t code = 0;
H
Hongze Cheng 已提交
660
  int32_t lino;
H
Hongze Cheng 已提交
661

H
Hongze Cheng 已提交
662
  writer->footer->statisBlkPtr->size = TARRAY2_DATA_LEN(writer->statisBlkArray);
H
Hongze Cheng 已提交
663
  if (writer->footer->statisBlkPtr->size) {
H
Hongze Cheng 已提交
664
    writer->footer->statisBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
665
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
H
Hongze Cheng 已提交
666
                         writer->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
667
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
668
    writer->file->size += writer->footer->statisBlkPtr->size;
H
Hongze Cheng 已提交
669 670 671 672
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
673
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
674
  }
H
Hongze Cheng 已提交
675 676 677
  return code;
}

H
Hongze Cheng 已提交
678
static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
679
  int32_t code = 0;
H
Hongze Cheng 已提交
680
  int32_t lino = 0;
H
Hongze Cheng 已提交
681

H
Hongze Cheng 已提交
682 683
  writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
  if (writer->footer->tombBlkPtr->size) {
H
Hongze Cheng 已提交
684
    writer->footer->tombBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
685 686
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray),
                         writer->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
687
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
688
    writer->file->size += writer->footer->tombBlkPtr->size;
H
Hongze Cheng 已提交
689 690 691 692
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
693
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
694 695 696 697
  }
  return code;
}

H
Hongze Cheng 已提交
698
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
699 700
  writer->footer->prevFooter = writer->config->file.size;
  int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer));
H
Hongze Cheng 已提交
701
  if (code) return code;
H
Hongze Cheng 已提交
702
  writer->file->size += sizeof(writer->footer);
H
Hongze Cheng 已提交
703
  return 0;
H
Hongze Cheng 已提交
704 705
}

H
Hongze Cheng 已提交
706
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
707
  int32_t code = 0;
H
Hongze Cheng 已提交
708
  int32_t lino = 0;
H
Hongze Cheng 已提交
709

H
Hongze Cheng 已提交
710
  // set
H
Hongze Cheng 已提交
711 712 713 714
  writer->file[0] = writer->config->file;
  writer->file->stt->nseg++;
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
H
Hongze Cheng 已提交
715
  if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr;
H
Hongze Cheng 已提交
716

H
Hongze Cheng 已提交
717 718 719
  // open file
  int32_t flag;
  char    fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
720

H
Hongze Cheng 已提交
721
  if (writer->file->size > 0) {
H
Hongze Cheng 已提交
722 723 724
    flag = TD_FILE_READ | TD_FILE_WRITE;
  } else {
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
725
  }
H
Hongze Cheng 已提交
726

H
Hongze Cheng 已提交
727
  tsdbTFileName(writer->config->tsdb, writer->file, fname);
H
Hongze Cheng 已提交
728
  code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
H
Hongze Cheng 已提交
729 730
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
731
  if (writer->file->size == 0) {
H
Hongze Cheng 已提交
732 733 734
    uint8_t hdr[TSDB_FHDR_SIZE] = {0};
    code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
735
    writer->file->size += sizeof(hdr);
H
Hongze Cheng 已提交
736
  }
H
Hongze Cheng 已提交
737

H
Hongze Cheng 已提交
738 739
  writer->ctx->opened = true;

H
Hongze Cheng 已提交
740
_exit:
H
Hongze Cheng 已提交
741
  if (code) {
H
Hongze Cheng 已提交
742
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
743
  }
H
Hongze Cheng 已提交
744
  return code;
H
Hongze Cheng 已提交
745 746
}

H
Hongze Cheng 已提交
747
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
748
  ASSERT(writer->fd == NULL);
H
Hongze Cheng 已提交
749

H
Hongze Cheng 已提交
750
  for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) {
H
Hongze Cheng 已提交
751
    tFree(writer->bufArr[i]);
H
Hongze Cheng 已提交
752 753 754
  }
  tDestroyTSchema(writer->skmRow->pTSchema);
  tDestroyTSchema(writer->skmTb->pTSchema);
H
Hongze Cheng 已提交
755 756 757
  tTombBlockDestroy(writer->tombBlock);
  tStatisBlockDestroy(writer->staticBlock);
  tBlockDataDestroy(writer->blockData);
H
Hongze Cheng 已提交
758 759 760
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
  TARRAY2_DESTROY(writer->statisBlkArray, NULL);
  TARRAY2_DESTROY(writer->sttBlkArray, NULL);
H
Hongze Cheng 已提交
761
}
H
Hongze Cheng 已提交
762

H
Hongze Cheng 已提交
763 764 765 766
static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
  // TODO
  return 0;
}
H
Hongze Cheng 已提交
767

H
Hongze Cheng 已提交
768
static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
769 770
  int32_t lino;
  int32_t code;
H
Hongze Cheng 已提交
771

H
Hongze Cheng 已提交
772
  code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
773
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
774

H
Hongze Cheng 已提交
775
  code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
776 777
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
778
  code = tsdbSttFileDoWriteTombBlock(writer);
H
Hongze Cheng 已提交
779
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
780

H
Hongze Cheng 已提交
781 782
  code = tsdbSttFileDoWriteSttBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
783

H
Hongze Cheng 已提交
784 785
  code = tsdbSttFileDoWriteStatisBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
786

H
Hongze Cheng 已提交
787
  code = tsdbSttFileDoWriteTombBlk(writer);
H
Hongze Cheng 已提交
788
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
789

H
Hongze Cheng 已提交
790 791
  code = tsdbSttFileDoWriteFooter(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
792

H
Hongze Cheng 已提交
793
  code = tsdbSttFileDoUpdateHeader(writer);
H
Hongze Cheng 已提交
794 795
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
796
  code = tsdbFsyncFile(writer->fd);
H
Hongze Cheng 已提交
797 798
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
799
  tsdbCloseFile(&writer->fd);
H
Hongze Cheng 已提交
800

H
Hongze Cheng 已提交
801
  ASSERT(writer->config->file.size < writer->file->size);
H
Hongze Cheng 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
  STFileOp op;
  if (writer->config->file.size == 0) {
    op = (STFileOp){
        .optype = TSDB_FOP_CREATE,
        .fid = writer->config->file.fid,
        .nf = writer->file[0],
    };
  } else {
    op = (STFileOp){
        .optype = TSDB_FOP_MODIFY,
        .fid = writer->config->file.fid,
        .of = writer->config->file,
        .nf = writer->file[0],
    };
  }
H
Hongze Cheng 已提交
817 818 819

  code = TARRAY2_APPEND(opArray, op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
820

H
Hongze Cheng 已提交
821 822
_exit:
  if (code) {
H
Hongze Cheng 已提交
823
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
824
  }
H
Hongze Cheng 已提交
825
  return code;
H
Hongze Cheng 已提交
826
}
H
Hongze Cheng 已提交
827

H
Hongze Cheng 已提交
828
static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
829
  if (writer->config->file.size) {  // truncate the file to the original size
H
Hongze Cheng 已提交
830 831
    ASSERT(writer->config->file.size <= writer->file->size);
    if (writer->config->file.size < writer->file->size) {
H
Hongze Cheng 已提交
832
      taosFtruncateFile(writer->fd->pFD, writer->config->file.size);
H
Hongze Cheng 已提交
833 834 835
      tsdbCloseFile(&writer->fd);
    }
  } else {  // remove the file
H
Hongze Cheng 已提交
836 837
    char fname[TSDB_FILENAME_LEN];
    tsdbTFileName(writer->config->tsdb, &writer->config->file, fname);
H
Hongze Cheng 已提交
838 839 840
    tsdbCloseFile(&writer->fd);
    taosRemoveFile(fname);
  }
H
Hongze Cheng 已提交
841 842
  return 0;
}
H
Hongze Cheng 已提交
843

H
Hongze Cheng 已提交
844
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) {
H
Hongze Cheng 已提交
845
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
H
Hongze Cheng 已提交
846 847
  if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
848
  writer[0]->config[0] = config[0];
H
Hongze Cheng 已提交
849
  writer[0]->ctx->opened = false;
H
Hongze Cheng 已提交
850 851 852
  return 0;
}

H
Hongze Cheng 已提交
853
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
854
  int32_t code = 0;
H
Hongze Cheng 已提交
855
  int32_t lino = 0;
H
Hongze Cheng 已提交
856

H
Hongze Cheng 已提交
857
  if (writer[0]->ctx->opened) {
H
Hongze Cheng 已提交
858 859
    if (abort) {
      code = tsdbSttFWriterCloseAbort(writer[0]);
H
Hongze Cheng 已提交
860
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
861
    } else {
H
Hongze Cheng 已提交
862
      code = tsdbSttFWriterCloseCommit(writer[0], opArray);
H
Hongze Cheng 已提交
863
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
864
    }
H
Hongze Cheng 已提交
865
    tsdbSttFWriterDoClose(writer[0]);
H
Hongze Cheng 已提交
866
  }
H
Hongze Cheng 已提交
867 868
  taosMemoryFree(writer[0]);
  writer[0] = NULL;
H
Hongze Cheng 已提交
869 870 871

_exit:
  if (code) {
H
Hongze Cheng 已提交
872
    TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
873
  }
H
Hongze Cheng 已提交
874
  return code;
H
Hongze Cheng 已提交
875 876
}

H
Hongze Cheng 已提交
877
int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
H
Hongze Cheng 已提交
878
  int32_t code = 0;
H
Hongze Cheng 已提交
879
  int32_t lino = 0;
H
Hongze Cheng 已提交
880

H
Hongze Cheng 已提交
881
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
882 883 884
    code = tsdbSttFWriterDoOpen(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
885

H
Hongze Cheng 已提交
886
  if (!TABLE_SAME_SCHEMA(row->suid, row->uid, writer->ctx->tbid->suid, writer->ctx->tbid->uid)) {
H
Hongze Cheng 已提交
887
    code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
888
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
889

H
Hongze Cheng 已提交
890 891 892 893
    code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb);
    TSDB_CHECK_CODE(code, lino, _exit);

    TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid};
H
Hongze Cheng 已提交
894
    code = tBlockDataInit(writer->blockData, &id, writer->config->skmTb->pTSchema, NULL, 0);
H
Hongze Cheng 已提交
895 896 897
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
898 899 900 901 902 903 904 905 906
  TSDBKEY key[1];
  if (row->row.type == TSDBROW_ROW_FMT) {
    key->ts = row->row.pTSRow->ts;
    key->version = row->row.version;
  } else {
    key->ts = row->row.pBlockData->aTSKEY[row->row.iRow];
    key->version = row->row.pBlockData->aVersion[row->row.iRow];
  }

H
Hongze Cheng 已提交
907 908 909 910
  if (writer->ctx->tbid->uid != row->uid) {
    writer->ctx->tbid->suid = row->suid;
    writer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
911
    if (STATIS_BLOCK_SIZE(writer->staticBlock) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
912
      code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
913
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
914 915
    }

H
Hongze Cheng 已提交
916
    STbStatisRecord record = {
H
Hongze Cheng 已提交
917 918
        .suid = row->suid,
        .uid = row->uid,
H
Hongze Cheng 已提交
919 920 921 922
        .firstKey = key->ts,
        .lastKey = key->ts,
        .minVer = key->version,
        .maxVer = key->version,
H
Hongze Cheng 已提交
923
        .count = 1,
H
Hongze Cheng 已提交
924
    };
H
Hongze Cheng 已提交
925
    code = tStatisBlockPut(writer->staticBlock, &record);
H
Hongze Cheng 已提交
926
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
927
  } else {
H
Hongze Cheng 已提交
928
    ASSERT(key->ts >= TARRAY2_LAST(writer->staticBlock->lastKey));
H
Hongze Cheng 已提交
929

H
Hongze Cheng 已提交
930 931 932 933 934 935 936 937 938
    if (TARRAY2_LAST(writer->staticBlock->minVer) > key->version) {
      TARRAY2_LAST(writer->staticBlock->minVer) = key->version;
    }
    if (TARRAY2_LAST(writer->staticBlock->maxVer) < key->version) {
      TARRAY2_LAST(writer->staticBlock->maxVer) = key->version;
    }
    if (key->ts > TARRAY2_LAST(writer->staticBlock->lastKey)) {
      TARRAY2_LAST(writer->staticBlock->count)++;
      TARRAY2_LAST(writer->staticBlock->lastKey) = key->ts;
H
Hongze Cheng 已提交
939
    }
H
Hongze Cheng 已提交
940 941
  }

H
Hongze Cheng 已提交
942
  if (row->row.type == TSDBROW_ROW_FMT) {
H
Hongze Cheng 已提交
943 944
    code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid,  //
                            TSDBROW_SVERSION(&row->row), writer->config->skmRow);
H
Hongze Cheng 已提交
945
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
946 947
  }

H
Hongze Cheng 已提交
948
  // row to col conversion
H
Hongze Cheng 已提交
949 950 951 952 953 954
  if (key->version <= writer->config->compactVersion                               //
      && writer->blockData->nRow > 0                                               //
      && writer->blockData->aTSKEY[writer->blockData->nRow - 1] == key->ts         //
      && (writer->blockData->uid                                                   //
              ? writer->blockData->uid                                             //
              : writer->blockData->aUid[writer->blockData->nRow - 1]) == row->uid  //
H
Hongze Cheng 已提交
955
  ) {
H
Hongze Cheng 已提交
956
    code = tBlockDataUpdateRow(writer->blockData, &row->row, writer->config->skmRow->pTSchema);
H
Hongze Cheng 已提交
957
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
958
  } else {
H
Hongze Cheng 已提交
959
    if (writer->blockData->nRow >= writer->config->maxRow) {
H
Hongze Cheng 已提交
960
      code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
961 962
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
963

H
Hongze Cheng 已提交
964
    code = tBlockDataAppendRow(writer->blockData, &row->row, writer->config->skmRow->pTSchema, row->uid);
H
Hongze Cheng 已提交
965
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
966 967 968 969
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
970
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
971
  }
H
Hongze Cheng 已提交
972
  return code;
H
Hongze Cheng 已提交
973 974
}

H
Hongze Cheng 已提交
975
int32_t tsdbSttFileWriteBlockData(SSttFileWriter *writer, SBlockData *bdata) {
H
Hongze Cheng 已提交
976 977 978
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
979 980
  SRowInfo row[1];
  row->suid = bdata->suid;
H
Hongze Cheng 已提交
981
  for (int32_t i = 0; i < bdata->nRow; i++) {
H
Hongze Cheng 已提交
982 983
    row->uid = bdata->uid ? bdata->uid : bdata->aUid[i];
    row->row = tsdbRowFromBlockData(bdata, i);
H
Hongze Cheng 已提交
984

H
Hongze Cheng 已提交
985
    code = tsdbSttFileWriteRow(writer, row);
H
Hongze Cheng 已提交
986 987 988 989 990
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
991
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
992
  }
H
Hongze Cheng 已提交
993
  return code;
H
Hongze Cheng 已提交
994 995
}

H
Hongze Cheng 已提交
996
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record) {
H
Hongze Cheng 已提交
997
  int32_t code;
H
Hongze Cheng 已提交
998 999
  int32_t lino;

H
Hongze Cheng 已提交
1000
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
1001 1002
    code = tsdbSttFWriterDoOpen(writer);
    return code;
H
Hongze Cheng 已提交
1003
  } else {
H
Hongze Cheng 已提交
1004
    if (writer->blockData->nRow > 0) {
H
Hongze Cheng 已提交
1005
      code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
1006 1007
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
1008

H
Hongze Cheng 已提交
1009
    if (STATIS_BLOCK_SIZE(writer->staticBlock) > 0) {
H
Hongze Cheng 已提交
1010 1011 1012
      code = tsdbSttFileDoWriteStatisBlock(writer);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
1013
  }
H
Hongze Cheng 已提交
1014

H
Hongze Cheng 已提交
1015
  code = tTombBlockPut(writer->tombBlock, record);
H
Hongze Cheng 已提交
1016
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1017

H
Hongze Cheng 已提交
1018
  if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
1019
    code = tsdbSttFileDoWriteTombBlock(writer);
H
Hongze Cheng 已提交
1020
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1021 1022 1023 1024
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
1025
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
1026 1027
  }
  return code;
H
Hongze Cheng 已提交
1028 1029 1030
}

bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; }