tsdbSttFileRW.c 32.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbSttFileRW.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
typedef struct {
  int64_t   prevFooter;
H
Hongze Cheng 已提交
20 21
  SFDataPtr sttBlkPtr[1];
  SFDataPtr statisBlkPtr[1];
H
Hongze Cheng 已提交
22
  SFDataPtr tombBlkPtr[1];
H
Hongze Cheng 已提交
23 24
  SFDataPtr rsrvd[2];
} SSttFooter;
H
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26
// SSttFReader ============================================================
H
Hongze Cheng 已提交
27
struct SSttFileReader {
H
Hongze Cheng 已提交
28
  SSttFileReaderConfig config[1];
H
Hongze Cheng 已提交
29
  TSttSegReaderArray   readerArray[1];
H
Hongze Cheng 已提交
30
  STsdbFD             *fd;
H
Hongze Cheng 已提交
31
  uint8_t             *bufArr[5];
H
Hongze Cheng 已提交
32 33 34 35
};

struct SSttSegReader {
  SSttFileReader *reader;
H
Hongze Cheng 已提交
36
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
37 38 39
  struct {
    bool sttBlkLoaded;
    bool statisBlkLoaded;
H
Hongze Cheng 已提交
40
    bool tombBlkLoaded;
H
Hongze Cheng 已提交
41
  } ctx[1];
H
Hongze Cheng 已提交
42 43
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
44
  TTombBlkArray   tombBlkArray[1];
H
Hongze Cheng 已提交
45 46
};

H
Hongze Cheng 已提交
47
// SSttFileReader
H
Hongze Cheng 已提交
48
static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
H
Hongze Cheng 已提交
49 50
  ASSERT(offset >= TSDB_FHDR_SIZE);

H
Hongze Cheng 已提交
51
  int32_t code = 0;
H
Hongze Cheng 已提交
52 53 54
  int32_t lino = 0;

  segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
H
Hongze Cheng 已提交
55
  if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
56 57

  segReader[0]->reader = reader;
H
Hongze Cheng 已提交
58
  code = tsdbReadFile(reader->fd, offset, (uint8_t *)(segReader[0]->footer), sizeof(SSttFooter));
H
Hongze Cheng 已提交
59 60 61 62
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
63
    TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
64 65 66
    taosMemoryFree(segReader[0]);
    segReader[0] = NULL;
  }
H
Hongze Cheng 已提交
67 68 69
  return code;
}

H
Hongze Cheng 已提交
70
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
H
Hongze Cheng 已提交
71
  if (reader[0]) {
H
Hongze Cheng 已提交
72 73
    TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
    TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL);
H
Hongze Cheng 已提交
74
    TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL);
H
Hongze Cheng 已提交
75 76
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
H
Hongze Cheng 已提交
77 78
  }
  return 0;
H
Hongze Cheng 已提交
79 80
}

H
Hongze Cheng 已提交
81
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
H
Hongze Cheng 已提交
82
  int32_t code = 0;
H
Hongze Cheng 已提交
83 84 85 86 87
  int32_t lino = 0;

  reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
  if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
88
  reader[0]->config[0] = config[0];
H
Hongze Cheng 已提交
89 90 91
  if (reader[0]->config->bufArr == NULL) {
    reader[0]->config->bufArr = reader[0]->bufArr;
  }
H
Hongze Cheng 已提交
92 93

  // open file
H
Hongze Cheng 已提交
94 95 96 97 98 99 100 101 102
  if (fname) {
    code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    char fname1[TSDB_FILENAME_LEN];
    tsdbTFileName(config->tsdb, config->file, fname1);
    code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
103 104

  // open each segment reader
H
Hongze Cheng 已提交
105
  int64_t size = config->file->size;
H
Hongze Cheng 已提交
106
  while (size > 0) {
H
Hongze Cheng 已提交
107
    SSttSegReader *reader1;
H
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109
    code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &reader1);
H
Hongze Cheng 已提交
110 111
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
112
    code = TARRAY2_APPEND(reader[0]->readerArray, reader1);
H
Hongze Cheng 已提交
113 114
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
115
    size = reader1->footer->prevFooter;
H
Hongze Cheng 已提交
116 117
  }

H
Hongze Cheng 已提交
118
  ASSERT(TARRAY2_SIZE(reader[0]->readerArray) == config->file->stt->nseg);
H
Hongze Cheng 已提交
119 120 121

_exit:
  if (code) {
H
Hongze Cheng 已提交
122
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
123
    tsdbSttFileReaderClose(reader);
H
Hongze Cheng 已提交
124
  }
H
Hongze Cheng 已提交
125 126 127
  return code;
}

H
Hongze Cheng 已提交
128
int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
H
Hongze Cheng 已提交
129
  if (reader[0]) {
H
Hongze Cheng 已提交
130 131 132
    for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) {
      tFree(reader[0]->bufArr[i]);
    }
H
Hongze Cheng 已提交
133
    tsdbCloseFile(&reader[0]->fd);
H
Hongze Cheng 已提交
134
    TARRAY2_DESTROY(reader[0]->readerArray, tsdbSttSegReaderClose);
H
Hongze Cheng 已提交
135 136 137
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
  }
H
Hongze Cheng 已提交
138
  return 0;
H
Hongze Cheng 已提交
139 140
}

H
Hongze Cheng 已提交
141
int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray) {
H
Hongze Cheng 已提交
142
  readerArray[0] = reader->readerArray;
H
Hongze Cheng 已提交
143
  return 0;
H
Hongze Cheng 已提交
144 145
}

H
Hongze Cheng 已提交
146
// SSttFSegReader
H
Hongze Cheng 已提交
147
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
H
Hongze Cheng 已提交
148
  if (!reader->ctx->statisBlkLoaded) {
H
Hongze Cheng 已提交
149
    if (reader->footer->statisBlkPtr->size > 0) {
H
Hongze Cheng 已提交
150
      ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
H
Hongze Cheng 已提交
151

H
Hongze Cheng 已提交
152
      int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk);
H
Hongze Cheng 已提交
153
      void   *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
154 155
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
156 157
      int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data,
                                  reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
158 159 160 161
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
162

H
Hongze Cheng 已提交
163
      TARRAY2_INIT_EX(reader->statisBlkArray, size, size, data);
H
Hongze Cheng 已提交
164
    } else {
H
Hongze Cheng 已提交
165
      TARRAY2_INIT(reader->statisBlkArray);
H
Hongze Cheng 已提交
166 167
    }

H
Hongze Cheng 已提交
168
    reader->ctx->statisBlkLoaded = true;
H
Hongze Cheng 已提交
169 170
  }

H
Hongze Cheng 已提交
171
  statisBlkArray[0] = reader->statisBlkArray;
H
Hongze Cheng 已提交
172
  return 0;
H
Hongze Cheng 已提交
173 174
}

H
Hongze Cheng 已提交
175 176 177 178
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tombBlkArray) {
  if (!reader->ctx->tombBlkLoaded) {
    if (reader->footer->tombBlkPtr->size > 0) {
      ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
H
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180 181
      int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
      void   *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
182 183
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
184
      int32_t code =
H
Hongze Cheng 已提交
185
          tsdbReadFile(reader->reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
186 187 188 189
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
190

H
Hongze Cheng 已提交
191
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
H
Hongze Cheng 已提交
192
    } else {
H
Hongze Cheng 已提交
193
      TARRAY2_INIT(reader->tombBlkArray);
H
Hongze Cheng 已提交
194 195
    }

H
Hongze Cheng 已提交
196
    reader->ctx->tombBlkLoaded = true;
H
Hongze Cheng 已提交
197 198
  }

H
Hongze Cheng 已提交
199
  tombBlkArray[0] = reader->tombBlkArray;
H
Hongze Cheng 已提交
200 201 202
  return 0;
}

H
Hongze Cheng 已提交
203
int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
H
Hongze Cheng 已提交
204
  if (!reader->ctx->sttBlkLoaded) {
H
Hongze Cheng 已提交
205 206
    if (reader->footer->sttBlkPtr->size > 0) {
      ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
H
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208 209
      int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
      void   *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
210 211
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
212 213
      int32_t code =
          tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
214 215 216 217
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
218

H
Hongze Cheng 已提交
219
      TARRAY2_INIT_EX(reader->sttBlkArray, size, size, data);
H
Hongze Cheng 已提交
220
    } else {
H
Hongze Cheng 已提交
221
      TARRAY2_INIT(reader->sttBlkArray);
H
Hongze Cheng 已提交
222 223
    }

H
Hongze Cheng 已提交
224
    reader->ctx->sttBlkLoaded = true;
H
Hongze Cheng 已提交
225 226
  }

H
Hongze Cheng 已提交
227
  sttBlkArray[0] = reader->sttBlkArray;
H
Hongze Cheng 已提交
228
  return 0;
H
Hongze Cheng 已提交
229 230
}

H
Hongze Cheng 已提交
231
int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
H
Hongze Cheng 已提交
232
  int32_t code = 0;
H
Hongze Cheng 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
  int32_t lino = 0;

  code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code =
      tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tDecmprBlockData(reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData,
                          &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
  }
H
Hongze Cheng 已提交
250 251 252
  return code;
}

H
Hongze Cheng 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
                                         STSchema *pTSchema, int16_t cids[], int32_t ncid) {
  int32_t code = 0;
  int32_t lino = 0;

  TABLEID tbid = {.suid = sttBlk->suid, .uid = 0};
  code = tBlockDataInit(bData, &tbid, pTSchema, cids, ncid);
  TSDB_CHECK_CODE(code, lino, _exit);

  // uid + version + tskey
  code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
  TSDB_CHECK_CODE(code, lino, _exit);

  // hdr
  SDiskDataHdr hdr[1];
  int32_t      size = 0;

  size += tGetDiskDataHdr(reader->reader->config->bufArr[0] + size, hdr);

  ASSERT(hdr->delimiter == TSDB_FILE_DLMT);

  bData->nRow = hdr->nRow;
  bData->uid = hdr->uid;

  // uid
  if (hdr->uid == 0) {
    ASSERT(hdr->szUid);
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
                          (uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    ASSERT(hdr->szUid == 0);
  }
  size += hdr->szUid;

  // version
  code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
                        (uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);
  size += hdr->szVer;

  // ts
  code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg,
                        (uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);
  size += hdr->szKey;

  ASSERT(size == sttBlk->bInfo.szKey);

  // other columns
  if (bData->nColData > 0) {
    if (hdr->szBlkCol > 0) {
      code = tRealloc(&reader->reader->config->bufArr[0], hdr->szBlkCol);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey,
                          reader->reader->config->bufArr[0], hdr->szBlkCol);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    SBlockCol  bc[1] = {{.cid = 0}};
    SBlockCol *blockCol = bc;

    size = 0;
    for (int32_t i = 0; i < bData->nColData; i++) {
      SColData *colData = tBlockDataGetColDataByIdx(bData, i);

      while (blockCol && blockCol->cid < colData->cid) {
        if (size < hdr->szBlkCol) {
          size += tGetBlockCol(reader->reader->config->bufArr[0] + size, blockCol);
        } else {
          ASSERT(size == hdr->szBlkCol);
          blockCol = NULL;
        }
      }

      if (blockCol == NULL || blockCol->cid > colData->cid) {
        for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
          code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type));
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      } else {
        ASSERT(blockCol->type == colData->type);
        ASSERT(blockCol->flag && blockCol->flag != HAS_NONE);

        if (blockCol->flag == HAS_NULL) {
          for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
            code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type));
            TSDB_CHECK_CODE(code, lino, _exit);
          }
        } else {
          int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;

          code = tRealloc(&reader->reader->config->bufArr[1], size1);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbReadFile(reader->reader->fd,
                              sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
                              reader->reader->config->bufArr[1], size1);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbDecmprColData(reader->reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
                                   &reader->reader->config->bufArr[2]);
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      }
    }
  }

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
  }
  return code;
}

H
Hongze Cheng 已提交
372
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) {
H
Hongze Cheng 已提交
373
  int32_t code = 0;
H
Hongze Cheng 已提交
374 375
  int32_t lino = 0;

H
Hongze Cheng 已提交
376
  code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size);
H
Hongze Cheng 已提交
377 378
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
379
  code = tsdbReadFile(reader->reader->fd, tombBlk->dp->offset, reader->reader->config->bufArr[0], tombBlk->dp->size);
H
Hongze Cheng 已提交
380 381 382
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
H
Hongze Cheng 已提交
383
  tTombBlockClear(dData);
H
Hongze Cheng 已提交
384
  for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) {
H
Hongze Cheng 已提交
385
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT,
H
Hongze Cheng 已提交
386
                          tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
H
Hongze Cheng 已提交
387
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
388 389
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
390 391
    code = TARRAY2_APPEND_BATCH(&dData->dataArr[i], reader->reader->config->bufArr[1], tombBlk->numRec);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
392

H
Hongze Cheng 已提交
393
    size += tombBlk->size[i];
H
Hongze Cheng 已提交
394 395
  }

H
Hongze Cheng 已提交
396
  ASSERT(size == tombBlk->dp->size);
H
Hongze Cheng 已提交
397 398
_exit:
  if (code) {
H
Hongze Cheng 已提交
399
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
400
  }
H
Hongze Cheng 已提交
401 402 403
  return code;
}

H
Hongze Cheng 已提交
404
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData) {
H
Hongze Cheng 已提交
405
  int32_t code = 0;
H
Hongze Cheng 已提交
406 407 408
  int32_t lino = 0;

  tStatisBlockClear(sData);
H
Hongze Cheng 已提交
409 410

  code = tRealloc(&reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
411 412
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
413 414
  code =
      tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
415 416 417
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
H
Hongze Cheng 已提交
418
  for (int32_t i = 0; i < ARRAY_SIZE(sData->dataArr); ++i) {
H
Hongze Cheng 已提交
419
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
H
Hongze Cheng 已提交
420
                          statisBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec,
H
Hongze Cheng 已提交
421
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
422 423
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
424 425
    code = TARRAY2_APPEND_BATCH(sData->dataArr + i, reader->reader->config->bufArr[1], statisBlk->numRec);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
426

H
Hongze Cheng 已提交
427 428 429 430 431 432 433
    size += statisBlk->size[i];
  }

  ASSERT(size == statisBlk->dp->size);

_exit:
  if (code) {
H
Hongze Cheng 已提交
434
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
435
  }
H
Hongze Cheng 已提交
436 437 438
  return code;
}

H
Hongze Cheng 已提交
439
// SSttFWriter ============================================================
H
Hongze Cheng 已提交
440
struct SSttFileWriter {
H
Hongze Cheng 已提交
441
  SSttFileWriterConfig config[1];
H
Hongze Cheng 已提交
442
  struct {
H
Hongze Cheng 已提交
443 444
    bool    opened;
    TABLEID tbid[1];
H
Hongze Cheng 已提交
445
  } ctx[1];
H
Hongze Cheng 已提交
446
  // file
H
Hongze Cheng 已提交
447
  STFile file[1];
H
Hongze Cheng 已提交
448
  // data
H
Hongze Cheng 已提交
449 450
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
451
  TTombBlkArray   tombBlkArray[1];
H
Hongze Cheng 已提交
452
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
453 454
  SBlockData      bData[1];
  STbStatisBlock  sData[1];
H
Hongze Cheng 已提交
455
  STombBlock      tData[1];
H
Hongze Cheng 已提交
456
  // helper data
H
Hongze Cheng 已提交
457 458
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
459
  uint8_t *bufArr[5];
H
Hongze Cheng 已提交
460
  STsdbFD *fd;
H
Hongze Cheng 已提交
461
};
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463
static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
464 465
  if (writer->bData->nRow == 0) return 0;

H
Hongze Cheng 已提交
466
  int32_t code = 0;
H
Hongze Cheng 已提交
467
  int32_t lino = 0;
H
Hongze Cheng 已提交
468 469 470 471 472 473 474 475 476 477 478 479

  SSttBlk sttBlk[1] = {{
      .suid = writer->bData->suid,
      .minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0],
      .maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1],
      .minKey = writer->bData->aTSKEY[0],
      .maxKey = writer->bData->aTSKEY[0],
      .minVer = writer->bData->aVersion[0],
      .maxVer = writer->bData->aVersion[0],
      .nRow = writer->bData->nRow,
  }};

H
Hongze Cheng 已提交
480 481 482 483 484 485 486
  for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) {
    if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->minVer > writer->bData->aVersion[iRow]) sttBlk->minVer = writer->bData->aVersion[iRow];
    if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow];
  }

H
Hongze Cheng 已提交
487 488
  int32_t sizeArr[5] = {0};
  code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr);
H
Hongze Cheng 已提交
489
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
490

H
Hongze Cheng 已提交
491
  sttBlk->bInfo.offset = writer->file->size;
H
Hongze Cheng 已提交
492 493
  sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
  sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
H
Hongze Cheng 已提交
494

H
Hongze Cheng 已提交
495
  for (int32_t i = 3; i >= 0; i--) {
H
Hongze Cheng 已提交
496 497
    if (sizeArr[i]) {
      code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]);
H
Hongze Cheng 已提交
498
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
499
      writer->file->size += sizeArr[i];
H
Hongze Cheng 已提交
500 501 502
    }
  }

H
Hongze Cheng 已提交
503
  code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk);
H
Hongze Cheng 已提交
504
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506 507
  tBlockDataClear(writer->bData);

H
Hongze Cheng 已提交
508 509
_exit:
  if (code) {
H
Hongze Cheng 已提交
510
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
511
  }
H
Hongze Cheng 已提交
512 513 514
  return code;
}

H
Hongze Cheng 已提交
515
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
516
  if (STATIS_BLOCK_SIZE(writer->sData) == 0) return 0;
H
Hongze Cheng 已提交
517

H
Hongze Cheng 已提交
518
  int32_t code = 0;
H
Hongze Cheng 已提交
519
  int32_t lino = 0;
H
Hongze Cheng 已提交
520

H
Hongze Cheng 已提交
521
  SStatisBlk statisBlk[1] = {{
H
Hongze Cheng 已提交
522 523 524 525 526
      .dp[0] =
          {
              .offset = writer->file->size,
              .size = 0,
          },
H
Hongze Cheng 已提交
527 528 529 530 531 532 533 534 535 536
      .minTbid =
          {
              .suid = TARRAY2_FIRST(writer->sData->suid),
              .uid = TARRAY2_FIRST(writer->sData->uid),
          },
      .maxTbid =
          {
              .suid = TARRAY2_LAST(writer->sData->suid),
              .uid = TARRAY2_LAST(writer->sData->uid),
          },
H
Hongze Cheng 已提交
537 538
      .minVer = TARRAY2_FIRST(writer->sData->minVer),
      .maxVer = TARRAY2_FIRST(writer->sData->maxVer),
H
Hongze Cheng 已提交
539 540
      .numRec = STATIS_BLOCK_SIZE(writer->sData),
      .cmprAlg = writer->config->cmprAlg,
H
Hongze Cheng 已提交
541 542
  }};

H
Hongze Cheng 已提交
543 544 545 546
  for (int32_t i = 1; i < STATIS_BLOCK_SIZE(writer->sData); i++) {
    statisBlk->minVer = TMIN(statisBlk->minVer, TARRAY2_GET(writer->sData->minVer, i));
    statisBlk->maxVer = TMAX(statisBlk->maxVer, TARRAY2_GET(writer->sData->maxVer, i));
  }
H
Hongze Cheng 已提交
547

H
Hongze Cheng 已提交
548
  for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) {
H
Hongze Cheng 已提交
549
    int32_t size;
H
Hongze Cheng 已提交
550
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->sData->dataArr + i),
H
Hongze Cheng 已提交
551
                        TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg,
H
Hongze Cheng 已提交
552
                        &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
H
Hongze Cheng 已提交
553
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555
    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size);
H
Hongze Cheng 已提交
556 557
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
558 559
    statisBlk->size[i] = size;
    statisBlk->dp->size += size;
H
Hongze Cheng 已提交
560
    writer->file->size += size;
H
Hongze Cheng 已提交
561
  }
H
Hongze Cheng 已提交
562

H
Hongze Cheng 已提交
563
  code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk);
H
Hongze Cheng 已提交
564
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
565

H
Hongze Cheng 已提交
566 567
  tStatisBlockClear(writer->sData);

H
Hongze Cheng 已提交
568 569
_exit:
  if (code) {
H
Hongze Cheng 已提交
570
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
571
  }
H
Hongze Cheng 已提交
572 573 574
  return code;
}

H
Hongze Cheng 已提交
575
static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
576
  if (TOMB_BLOCK_SIZE(writer->tData) == 0) return 0;
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578
  int32_t code = 0;
H
Hongze Cheng 已提交
579
  int32_t lino = 0;
H
Hongze Cheng 已提交
580

H
Hongze Cheng 已提交
581
  STombBlk tombBlk[1] = {{
H
Hongze Cheng 已提交
582 583 584 585 586
      .dp[0] =
          {
              .offset = writer->file->size,
              .size = 0,
          },
H
Hongze Cheng 已提交
587
      .minTbid =
H
Hongze Cheng 已提交
588
          {
H
Hongze Cheng 已提交
589 590
              .suid = TARRAY2_FIRST(writer->tData->suid),
              .uid = TARRAY2_FIRST(writer->tData->uid),
H
Hongze Cheng 已提交
591
          },
H
Hongze Cheng 已提交
592
      .maxTbid =
H
Hongze Cheng 已提交
593
          {
H
Hongze Cheng 已提交
594 595
              .suid = TARRAY2_LAST(writer->tData->suid),
              .uid = TARRAY2_LAST(writer->tData->uid),
H
Hongze Cheng 已提交
596
          },
H
Hongze Cheng 已提交
597 598
      .minVer = TARRAY2_FIRST(writer->tData->version),
      .maxVer = TARRAY2_FIRST(writer->tData->version),
H
Hongze Cheng 已提交
599 600
      .numRec = TOMB_BLOCK_SIZE(writer->tData),
      .cmprAlg = writer->config->cmprAlg,
H
Hongze Cheng 已提交
601
  }};
H
Hongze Cheng 已提交
602

H
Hongze Cheng 已提交
603 604 605
  for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tData); i++) {
    tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tData->version, i));
    tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tData->version, i));
H
Hongze Cheng 已提交
606 607
  }

H
Hongze Cheng 已提交
608
  for (int32_t i = 0; i < ARRAY_SIZE(writer->tData->dataArr); i++) {
H
Hongze Cheng 已提交
609
    int32_t size;
H
Hongze Cheng 已提交
610
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]),
H
Hongze Cheng 已提交
611
                        TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
H
Hongze Cheng 已提交
612
                        &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
H
Hongze Cheng 已提交
613
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
614

H
Hongze Cheng 已提交
615
    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size);
H
Hongze Cheng 已提交
616 617
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
618
    tombBlk->size[i] = size;
H
Hongze Cheng 已提交
619
    tombBlk->dp->size += size;
H
Hongze Cheng 已提交
620
    writer->file->size += size;
H
Hongze Cheng 已提交
621 622
  }

H
Hongze Cheng 已提交
623
  code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk);
H
Hongze Cheng 已提交
624
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
625

H
Hongze Cheng 已提交
626
  tTombBlockClear(writer->tData);
H
Hongze Cheng 已提交
627

H
Hongze Cheng 已提交
628 629
_exit:
  if (code) {
H
Hongze Cheng 已提交
630
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
631
  }
H
Hongze Cheng 已提交
632 633 634
  return code;
}

H
Hongze Cheng 已提交
635
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
636
  int32_t code = 0;
H
Hongze Cheng 已提交
637 638
  int32_t lino;

H
Hongze Cheng 已提交
639
  writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray);
H
Hongze Cheng 已提交
640
  if (writer->footer->sttBlkPtr->size) {
H
Hongze Cheng 已提交
641
    writer->footer->sttBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
642
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
H
Hongze Cheng 已提交
643
                         writer->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
644
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
645
    writer->file->size += writer->footer->sttBlkPtr->size;
H
Hongze Cheng 已提交
646 647 648 649
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
650
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
651
  }
H
Hongze Cheng 已提交
652 653 654
  return code;
}

H
Hongze Cheng 已提交
655
static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
656
  int32_t code = 0;
H
Hongze Cheng 已提交
657
  int32_t lino;
H
Hongze Cheng 已提交
658

H
Hongze Cheng 已提交
659
  writer->footer->statisBlkPtr->size = TARRAY2_DATA_LEN(writer->statisBlkArray);
H
Hongze Cheng 已提交
660
  if (writer->footer->statisBlkPtr->size) {
H
Hongze Cheng 已提交
661
    writer->footer->statisBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
662
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
H
Hongze Cheng 已提交
663
                         writer->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
664
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
665
    writer->file->size += writer->footer->statisBlkPtr->size;
H
Hongze Cheng 已提交
666 667 668 669
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
670
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
671
  }
H
Hongze Cheng 已提交
672 673 674
  return code;
}

H
Hongze Cheng 已提交
675
static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
676
  int32_t code = 0;
H
Hongze Cheng 已提交
677
  int32_t lino = 0;
H
Hongze Cheng 已提交
678

H
Hongze Cheng 已提交
679 680
  writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
  if (writer->footer->tombBlkPtr->size) {
H
Hongze Cheng 已提交
681
    writer->footer->tombBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
682 683
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray),
                         writer->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
684
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
685
    writer->file->size += writer->footer->tombBlkPtr->size;
H
Hongze Cheng 已提交
686 687 688 689
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
690
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
691 692 693 694
  }
  return code;
}

H
Hongze Cheng 已提交
695
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
696 697
  writer->footer->prevFooter = writer->config->file.size;
  int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer));
H
Hongze Cheng 已提交
698
  if (code) return code;
H
Hongze Cheng 已提交
699
  writer->file->size += sizeof(writer->footer);
H
Hongze Cheng 已提交
700
  return 0;
H
Hongze Cheng 已提交
701 702
}

H
Hongze Cheng 已提交
703
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
704
  int32_t code = 0;
H
Hongze Cheng 已提交
705
  int32_t lino = 0;
H
Hongze Cheng 已提交
706

H
Hongze Cheng 已提交
707
  // set
H
Hongze Cheng 已提交
708 709 710 711
  writer->file[0] = writer->config->file;
  writer->file->stt->nseg++;
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
H
Hongze Cheng 已提交
712
  if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr;
H
Hongze Cheng 已提交
713

H
Hongze Cheng 已提交
714 715 716
  // open file
  int32_t flag;
  char    fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
717

H
Hongze Cheng 已提交
718
  if (writer->file->size > 0) {
H
Hongze Cheng 已提交
719 720 721
    flag = TD_FILE_READ | TD_FILE_WRITE;
  } else {
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
722
  }
H
Hongze Cheng 已提交
723

H
Hongze Cheng 已提交
724
  tsdbTFileName(writer->config->tsdb, writer->file, fname);
H
Hongze Cheng 已提交
725
  code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
H
Hongze Cheng 已提交
726 727
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
728
  if (writer->file->size == 0) {
H
Hongze Cheng 已提交
729 730 731
    uint8_t hdr[TSDB_FHDR_SIZE] = {0};
    code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
732
    writer->file->size += sizeof(hdr);
H
Hongze Cheng 已提交
733
  }
H
Hongze Cheng 已提交
734

H
Hongze Cheng 已提交
735 736
  writer->ctx->opened = true;

H
Hongze Cheng 已提交
737
_exit:
H
Hongze Cheng 已提交
738
  if (code) {
H
Hongze Cheng 已提交
739
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
740
  }
H
Hongze Cheng 已提交
741
  return code;
H
Hongze Cheng 已提交
742 743
}

H
Hongze Cheng 已提交
744
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
745
  ASSERT(writer->fd == NULL);
H
Hongze Cheng 已提交
746

H
Hongze Cheng 已提交
747
  for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) {
H
Hongze Cheng 已提交
748
    tFree(writer->bufArr[i]);
H
Hongze Cheng 已提交
749 750 751
  }
  tDestroyTSchema(writer->skmRow->pTSchema);
  tDestroyTSchema(writer->skmTb->pTSchema);
H
Hongze Cheng 已提交
752
  tTombBlockDestroy(writer->tData);
H
Hongze Cheng 已提交
753
  tStatisBlockDestroy(writer->sData);
H
Hongze Cheng 已提交
754
  tBlockDataDestroy(writer->bData);
H
Hongze Cheng 已提交
755 756 757
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
  TARRAY2_DESTROY(writer->statisBlkArray, NULL);
  TARRAY2_DESTROY(writer->sttBlkArray, NULL);
H
Hongze Cheng 已提交
758
}
H
Hongze Cheng 已提交
759

H
Hongze Cheng 已提交
760 761 762 763
static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
  // TODO
  return 0;
}
H
Hongze Cheng 已提交
764

H
Hongze Cheng 已提交
765
static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
766 767
  int32_t lino;
  int32_t code;
H
Hongze Cheng 已提交
768

H
Hongze Cheng 已提交
769
  code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
770
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
771

H
Hongze Cheng 已提交
772
  code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
773 774
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
775
  code = tsdbSttFileDoWriteTombBlock(writer);
H
Hongze Cheng 已提交
776
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
777

H
Hongze Cheng 已提交
778 779
  code = tsdbSttFileDoWriteSttBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
780

H
Hongze Cheng 已提交
781 782
  code = tsdbSttFileDoWriteStatisBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
783

H
Hongze Cheng 已提交
784
  code = tsdbSttFileDoWriteTombBlk(writer);
H
Hongze Cheng 已提交
785
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
786

H
Hongze Cheng 已提交
787 788
  code = tsdbSttFileDoWriteFooter(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
789

H
Hongze Cheng 已提交
790
  code = tsdbSttFileDoUpdateHeader(writer);
H
Hongze Cheng 已提交
791 792
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
793
  code = tsdbFsyncFile(writer->fd);
H
Hongze Cheng 已提交
794 795
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
796
  tsdbCloseFile(&writer->fd);
H
Hongze Cheng 已提交
797

H
Hongze Cheng 已提交
798
  ASSERT(writer->config->file.size < writer->file->size);
H
Hongze Cheng 已提交
799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
  STFileOp op;
  if (writer->config->file.size == 0) {
    op = (STFileOp){
        .optype = TSDB_FOP_CREATE,
        .fid = writer->config->file.fid,
        .nf = writer->file[0],
    };
  } else {
    op = (STFileOp){
        .optype = TSDB_FOP_MODIFY,
        .fid = writer->config->file.fid,
        .of = writer->config->file,
        .nf = writer->file[0],
    };
  }
H
Hongze Cheng 已提交
814 815 816

  code = TARRAY2_APPEND(opArray, op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
817

H
Hongze Cheng 已提交
818 819
_exit:
  if (code) {
H
Hongze Cheng 已提交
820
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
821
  }
H
Hongze Cheng 已提交
822
  return code;
H
Hongze Cheng 已提交
823
}
H
Hongze Cheng 已提交
824

H
Hongze Cheng 已提交
825
static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
826
  if (writer->config->file.size) {  // truncate the file to the original size
H
Hongze Cheng 已提交
827 828
    ASSERT(writer->config->file.size <= writer->file->size);
    if (writer->config->file.size < writer->file->size) {
H
Hongze Cheng 已提交
829
      taosFtruncateFile(writer->fd->pFD, writer->config->file.size);
H
Hongze Cheng 已提交
830 831 832
      tsdbCloseFile(&writer->fd);
    }
  } else {  // remove the file
H
Hongze Cheng 已提交
833 834
    char fname[TSDB_FILENAME_LEN];
    tsdbTFileName(writer->config->tsdb, &writer->config->file, fname);
H
Hongze Cheng 已提交
835 836 837
    tsdbCloseFile(&writer->fd);
    taosRemoveFile(fname);
  }
H
Hongze Cheng 已提交
838 839
  return 0;
}
H
Hongze Cheng 已提交
840

H
Hongze Cheng 已提交
841
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) {
H
Hongze Cheng 已提交
842
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
H
Hongze Cheng 已提交
843 844
  if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
845
  writer[0]->config[0] = config[0];
H
Hongze Cheng 已提交
846
  writer[0]->ctx->opened = false;
H
Hongze Cheng 已提交
847 848 849
  return 0;
}

H
Hongze Cheng 已提交
850
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
851
  int32_t code = 0;
H
Hongze Cheng 已提交
852
  int32_t lino = 0;
H
Hongze Cheng 已提交
853

H
Hongze Cheng 已提交
854
  if (writer[0]->ctx->opened) {
H
Hongze Cheng 已提交
855 856
    if (abort) {
      code = tsdbSttFWriterCloseAbort(writer[0]);
H
Hongze Cheng 已提交
857
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
858
    } else {
H
Hongze Cheng 已提交
859
      code = tsdbSttFWriterCloseCommit(writer[0], opArray);
H
Hongze Cheng 已提交
860
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
861
    }
H
Hongze Cheng 已提交
862
    tsdbSttFWriterDoClose(writer[0]);
H
Hongze Cheng 已提交
863
  }
H
Hongze Cheng 已提交
864 865
  taosMemoryFree(writer[0]);
  writer[0] = NULL;
H
Hongze Cheng 已提交
866 867 868

_exit:
  if (code) {
H
Hongze Cheng 已提交
869
    TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
870
  }
H
Hongze Cheng 已提交
871
  return code;
H
Hongze Cheng 已提交
872 873
}

H
Hongze Cheng 已提交
874
int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
H
Hongze Cheng 已提交
875
  int32_t code = 0;
H
Hongze Cheng 已提交
876
  int32_t lino = 0;
H
Hongze Cheng 已提交
877

H
Hongze Cheng 已提交
878
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
879 880 881
    code = tsdbSttFWriterDoOpen(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
882

H
Hongze Cheng 已提交
883 884
  TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
  if (!TABLE_SAME_SCHEMA(row->suid, row->uid, writer->ctx->tbid->suid, writer->ctx->tbid->uid)) {
H
Hongze Cheng 已提交
885
    code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
886
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
887

H
Hongze Cheng 已提交
888 889 890 891 892 893 894 895 896 897 898 899
    code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb);
    TSDB_CHECK_CODE(code, lino, _exit);

    TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid};
    code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  if (writer->ctx->tbid->uid != row->uid) {
    writer->ctx->tbid->suid = row->suid;
    writer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
900
    if (STATIS_BLOCK_SIZE(writer->sData) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
901
      code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
902
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
903 904
    }

H
Hongze Cheng 已提交
905
    STbStatisRecord record = {
H
Hongze Cheng 已提交
906 907
        .suid = row->suid,
        .uid = row->uid,
H
Hongze Cheng 已提交
908 909 910 911
        .firstKey = key->ts,
        .lastKey = key->ts,
        .minVer = key->version,
        .maxVer = key->version,
H
Hongze Cheng 已提交
912
        .count = 1,
H
Hongze Cheng 已提交
913 914
    };
    code = tStatisBlockPut(writer->sData, &record);
H
Hongze Cheng 已提交
915
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
916
  } else {
H
Hongze Cheng 已提交
917 918
    ASSERT(key->ts >= TARRAY2_LAST(writer->sData->lastKey));

H
Hongze Cheng 已提交
919 920 921 922 923 924
    TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version);
    TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version);
    if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) {
      TARRAY2_LAST(writer->sData->count)++;
      TARRAY2_LAST(writer->sData->lastKey) = key->ts;
    }
H
Hongze Cheng 已提交
925 926
  }

H
Hongze Cheng 已提交
927
  if (row->row.type == TSDBROW_ROW_FMT) {
H
Hongze Cheng 已提交
928 929
    code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid,  //
                            TSDBROW_SVERSION(&row->row), writer->config->skmRow);
H
Hongze Cheng 已提交
930
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
931 932
  }

H
Hongze Cheng 已提交
933
  // row to col conversion
H
Hongze Cheng 已提交
934 935 936 937 938 939 940 941 942
  if (key->version <= writer->config->compactVersion                       //
      && writer->bData->nRow > 0                                           //
      && (writer->bData->uid                                               //
              ? writer->bData->uid                                         //
              : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid  //
      && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts         //
  ) {
    code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
943 944
  } else {
    if (writer->bData->nRow >= writer->config->maxRow) {
H
Hongze Cheng 已提交
945
      code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
946 947
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
948

H
Hongze Cheng 已提交
949
    code = tBlockDataAppendRow(writer->bData, &row->row, writer->config->skmRow->pTSchema, row->uid);
H
Hongze Cheng 已提交
950
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
951 952 953 954
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
955
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
956
  }
H
Hongze Cheng 已提交
957
  return code;
H
Hongze Cheng 已提交
958 959
}

H
Hongze Cheng 已提交
960
int32_t tsdbSttFileWriteBlockData(SSttFileWriter *writer, SBlockData *bdata) {
H
Hongze Cheng 已提交
961 962 963
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
964
  // TODO: optimize here
H
Hongze Cheng 已提交
965 966
  SRowInfo row[1];
  row->suid = bdata->suid;
H
Hongze Cheng 已提交
967
  for (int32_t i = 0; i < bdata->nRow; i++) {
H
Hongze Cheng 已提交
968 969
    row->uid = bdata->uid ? bdata->uid : bdata->aUid[i];
    row->row = tsdbRowFromBlockData(bdata, i);
H
Hongze Cheng 已提交
970

H
Hongze Cheng 已提交
971
    code = tsdbSttFileWriteRow(writer, row);
H
Hongze Cheng 已提交
972 973 974 975 976
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
977
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
978
  }
H
Hongze Cheng 已提交
979
  return code;
H
Hongze Cheng 已提交
980 981
}

H
Hongze Cheng 已提交
982
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record) {
H
Hongze Cheng 已提交
983
  int32_t code;
H
Hongze Cheng 已提交
984 985
  int32_t lino;

H
Hongze Cheng 已提交
986
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
987 988
    code = tsdbSttFWriterDoOpen(writer);
    return code;
H
Hongze Cheng 已提交
989 990
  } else {
    if (writer->bData->nRow > 0) {
H
Hongze Cheng 已提交
991
      code = tsdbSttFileDoWriteBlockData(writer);
H
Hongze Cheng 已提交
992 993
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
994

H
Hongze Cheng 已提交
995 996 997 998
    if (STATIS_BLOCK_SIZE(writer->sData) > 0) {
      code = tsdbSttFileDoWriteStatisBlock(writer);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
999
  }
H
Hongze Cheng 已提交
1000

H
Hongze Cheng 已提交
1001
  // write SDelRecord
H
Hongze Cheng 已提交
1002
  code = tTombBlockPut(writer->tData, record);
H
Hongze Cheng 已提交
1003
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1004

H
Hongze Cheng 已提交
1005
  // write SDelBlock if need
H
Hongze Cheng 已提交
1006
  if (TOMB_BLOCK_SIZE(writer->tData) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
1007
    code = tsdbSttFileDoWriteTombBlock(writer);
H
Hongze Cheng 已提交
1008
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1009 1010 1011 1012
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
1013
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
1014 1015
  }
  return code;
H
Hongze Cheng 已提交
1016 1017 1018
}

bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; }