tsdbSttFileRW.c 27.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbSttFileRW.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
typedef struct {
  int64_t   prevFooter;
H
Hongze Cheng 已提交
20 21 22 23 24
  SFDataPtr sttBlkPtr[1];
  SFDataPtr delBlkPtr[1];
  SFDataPtr statisBlkPtr[1];
  SFDataPtr rsrvd[2];
} SSttFooter;
H
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26
// SSttFReader ============================================================
H
Hongze Cheng 已提交
27
struct SSttFileReader {
H
Hongze Cheng 已提交
28
  SSttFileReaderConfig config[1];
H
Hongze Cheng 已提交
29
  TSttSegReaderArray   readerArray[1];
H
Hongze Cheng 已提交
30
  STsdbFD             *fd;
H
Hongze Cheng 已提交
31
  uint8_t             *bufArr[5];
H
Hongze Cheng 已提交
32 33 34 35
};

struct SSttSegReader {
  SSttFileReader *reader;
H
Hongze Cheng 已提交
36
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
37 38 39 40
  struct {
    bool sttBlkLoaded;
    bool delBlkLoaded;
    bool statisBlkLoaded;
H
Hongze Cheng 已提交
41
  } ctx[1];
H
Hongze Cheng 已提交
42 43
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
44
  TDelBlkArray    delBlkArray[1];
H
Hongze Cheng 已提交
45 46
};

H
Hongze Cheng 已提交
47
// SSttFileReader
H
Hongze Cheng 已提交
48
static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
H
Hongze Cheng 已提交
49 50
  ASSERT(offset >= TSDB_FHDR_SIZE);

H
Hongze Cheng 已提交
51
  int32_t code = 0;
H
Hongze Cheng 已提交
52
  int32_t lino = 0;
H
Hongze Cheng 已提交
53
  int32_t vid = TD_VID(reader->config->tsdb->pVnode);
H
Hongze Cheng 已提交
54 55

  segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
H
Hongze Cheng 已提交
56
  if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
57 58

  segReader[0]->reader = reader;
H
Hongze Cheng 已提交
59
  code = tsdbReadFile(reader->fd, offset, (uint8_t *)(segReader[0]->footer), sizeof(SSttFooter));
H
Hongze Cheng 已提交
60 61 62 63
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
64
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
65 66 67
    taosMemoryFree(segReader[0]);
    segReader[0] = NULL;
  }
H
Hongze Cheng 已提交
68 69 70
  return code;
}

H
Hongze Cheng 已提交
71
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
H
Hongze Cheng 已提交
72
  if (reader[0]) {
H
Hongze Cheng 已提交
73 74 75
    TARRAY2_FREE(reader[0]->sttBlkArray);
    TARRAY2_FREE(reader[0]->delBlkArray);
    TARRAY2_FREE(reader[0]->statisBlkArray);
H
Hongze Cheng 已提交
76 77
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
H
Hongze Cheng 已提交
78 79
  }
  return 0;
H
Hongze Cheng 已提交
80 81
}

H
Hongze Cheng 已提交
82
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
H
Hongze Cheng 已提交
83
  int32_t code = 0;
H
Hongze Cheng 已提交
84 85 86 87 88 89
  int32_t lino = 0;
  int32_t vid = TD_VID(config->tsdb->pVnode);

  reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
  if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
90
  reader[0]->config[0] = config[0];
H
Hongze Cheng 已提交
91
  if (!reader[0]->config->bufArr) reader[0]->config->bufArr = reader[0]->bufArr;
H
Hongze Cheng 已提交
92 93 94 95 96 97

  // open file
  code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open each segment reader
H
Hongze Cheng 已提交
98
  int64_t size = config->file->size;
H
Hongze Cheng 已提交
99
  while (size > 0) {
H
Hongze Cheng 已提交
100
    SSttSegReader *reader1;
H
Hongze Cheng 已提交
101

H
Hongze Cheng 已提交
102
    code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &reader1);
H
Hongze Cheng 已提交
103 104
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
105
    code = TARRAY2_APPEND(reader[0]->readerArray, reader1);
H
Hongze Cheng 已提交
106 107
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
108
    size = reader1->footer->prevFooter;
H
Hongze Cheng 已提交
109 110
  }

H
Hongze Cheng 已提交
111
  ASSERT(TARRAY2_SIZE(reader[0]->readerArray) == config->file->stt->nseg);
H
Hongze Cheng 已提交
112 113 114

_exit:
  if (code) {
H
Hongze Cheng 已提交
115
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
116
    tsdbSttFileReaderClose(reader);
H
Hongze Cheng 已提交
117
  }
H
Hongze Cheng 已提交
118 119 120
  return code;
}

H
Hongze Cheng 已提交
121
int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
H
Hongze Cheng 已提交
122
  if (reader[0]) {
H
Hongze Cheng 已提交
123 124 125
    for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) {
      tFree(reader[0]->bufArr[i]);
    }
H
Hongze Cheng 已提交
126 127 128 129 130
    tsdbCloseFile(&reader[0]->fd);
    TARRAY2_CLEAR_FREE(reader[0]->readerArray, tsdbSttSegReaderClose);
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
  }
H
Hongze Cheng 已提交
131
  return 0;
H
Hongze Cheng 已提交
132 133
}

H
Hongze Cheng 已提交
134
int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray) {
H
Hongze Cheng 已提交
135
  readerArray[0] = reader->readerArray;
H
Hongze Cheng 已提交
136
  return 0;
H
Hongze Cheng 已提交
137 138
}

H
Hongze Cheng 已提交
139
// SSttFSegReader
H
Hongze Cheng 已提交
140
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
H
Hongze Cheng 已提交
141
  if (!reader->ctx->statisBlkLoaded) {
H
Hongze Cheng 已提交
142
    if (reader->footer->statisBlkPtr->size > 0) {
H
Hongze Cheng 已提交
143
      ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
H
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145
      int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk);
H
Hongze Cheng 已提交
146
      void   *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
147 148
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
149 150
      int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data,
                                  reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
151 152 153 154
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
155

H
Hongze Cheng 已提交
156
      TARRAY2_INIT_EX(reader->statisBlkArray, size, size, data);
H
Hongze Cheng 已提交
157
    } else {
H
Hongze Cheng 已提交
158
      TARRAY2_INIT(reader->statisBlkArray);
H
Hongze Cheng 已提交
159 160
    }

H
Hongze Cheng 已提交
161
    reader->ctx->statisBlkLoaded = true;
H
Hongze Cheng 已提交
162 163
  }

H
Hongze Cheng 已提交
164
  statisBlkArray[0] = reader->statisBlkArray;
H
Hongze Cheng 已提交
165
  return 0;
H
Hongze Cheng 已提交
166 167
}

H
Hongze Cheng 已提交
168
int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) {
H
Hongze Cheng 已提交
169
  if (!reader->ctx->delBlkLoaded) {
H
Hongze Cheng 已提交
170 171
    if (reader->footer->delBlkPtr->size > 0) {
      ASSERT(reader->footer->delBlkPtr->size % sizeof(SDelBlk) == 0);
H
Hongze Cheng 已提交
172

H
Hongze Cheng 已提交
173 174
      int32_t size = reader->footer->delBlkPtr->size / sizeof(SDelBlk);
      void   *data = taosMemoryMalloc(reader->footer->delBlkPtr->size);
H
Hongze Cheng 已提交
175 176
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
177 178
      int32_t code =
          tsdbReadFile(reader->reader->fd, reader->footer->delBlkPtr->offset, data, reader->footer->delBlkPtr->size);
H
Hongze Cheng 已提交
179 180 181 182
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
183

H
Hongze Cheng 已提交
184
      TARRAY2_INIT_EX(reader->delBlkArray, size, size, data);
H
Hongze Cheng 已提交
185
    } else {
H
Hongze Cheng 已提交
186
      TARRAY2_INIT(reader->delBlkArray);
H
Hongze Cheng 已提交
187 188
    }

H
Hongze Cheng 已提交
189
    reader->ctx->delBlkLoaded = true;
H
Hongze Cheng 已提交
190 191
  }

H
Hongze Cheng 已提交
192
  delBlkArray[0] = reader->delBlkArray;
H
Hongze Cheng 已提交
193 194 195
  return 0;
}

H
Hongze Cheng 已提交
196
int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
H
Hongze Cheng 已提交
197
  if (!reader->ctx->sttBlkLoaded) {
H
Hongze Cheng 已提交
198 199
    if (reader->footer->sttBlkPtr->size > 0) {
      ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
H
Hongze Cheng 已提交
200

H
Hongze Cheng 已提交
201 202
      int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
      void   *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
203 204
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
205 206
      int32_t code =
          tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
207 208 209 210
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
211

H
Hongze Cheng 已提交
212
      TARRAY2_INIT_EX(reader->sttBlkArray, size, size, data);
H
Hongze Cheng 已提交
213
    } else {
H
Hongze Cheng 已提交
214
      TARRAY2_INIT(reader->sttBlkArray);
H
Hongze Cheng 已提交
215 216
    }

H
Hongze Cheng 已提交
217
    reader->ctx->sttBlkLoaded = true;
H
Hongze Cheng 已提交
218 219
  }

H
Hongze Cheng 已提交
220
  sttBlkArray[0] = reader->sttBlkArray;
H
Hongze Cheng 已提交
221
  return 0;
H
Hongze Cheng 已提交
222 223
}

H
Hongze Cheng 已提交
224
int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
H
Hongze Cheng 已提交
225
  int32_t code = 0;
H
Hongze Cheng 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
  int32_t lino = 0;

  code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code =
      tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tDecmprBlockData(reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData,
                          &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
  }
H
Hongze Cheng 已提交
243 244 245
  return code;
}

H
Hongze Cheng 已提交
246
int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) {
H
Hongze Cheng 已提交
247
  int32_t code = 0;
H
Hongze Cheng 已提交
248 249 250
  int32_t lino = 0;

  tDelBlockClear(dData);
H
Hongze Cheng 已提交
251 252

  code = tRealloc(&reader->reader->config->bufArr[0], delBlk->dp->size);
H
Hongze Cheng 已提交
253 254
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
255
  code = tsdbReadFile(reader->reader->fd, delBlk->dp->offset, reader->reader->config->bufArr[0], delBlk->dp->size);
H
Hongze Cheng 已提交
256 257 258
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
H
Hongze Cheng 已提交
259
  for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) {
H
Hongze Cheng 已提交
260 261 262
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, delBlk->size[i], TSDB_DATA_TYPE_BIGINT,
                          TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * delBlk->numRec,
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
263 264
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
265 266 267 268 269
    for (int32_t j = 0; j < delBlk->numRec; ++j) {
      code = TARRAY2_APPEND(&dData->dataArr[i], ((int64_t *)(reader->reader->config->bufArr[1]))[j]);
      continue;
    }

H
Hongze Cheng 已提交
270 271 272 273 274 275
    size += delBlk->size[i];
  }

  ASSERT(size == delBlk->dp->size);
_exit:
  if (code) {
H
Hongze Cheng 已提交
276
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
277
  }
H
Hongze Cheng 已提交
278 279 280
  return code;
}

H
Hongze Cheng 已提交
281
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData) {
H
Hongze Cheng 已提交
282
  int32_t code = 0;
H
Hongze Cheng 已提交
283 284 285
  int32_t lino = 0;

  tStatisBlockClear(sData);
H
Hongze Cheng 已提交
286 287

  code = tRealloc(&reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
288 289
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
290 291
  code =
      tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
292 293 294
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
H
Hongze Cheng 已提交
295
  for (int32_t i = 0; i < ARRAY_SIZE(sData->dataArr); ++i) {
H
Hongze Cheng 已提交
296 297 298
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
                          TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec,
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
299 300
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
301 302 303 304 305
    for (int32_t j = 0; j < statisBlk->numRec; ++j) {
      code = TARRAY2_APPEND(sData->dataArr + i, ((int64_t *)reader->reader->config->bufArr[1])[j]);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
306 307 308 309 310 311 312
    size += statisBlk->size[i];
  }

  ASSERT(size == statisBlk->dp->size);

_exit:
  if (code) {
H
Hongze Cheng 已提交
313
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
314
  }
H
Hongze Cheng 已提交
315 316 317
  return code;
}

H
Hongze Cheng 已提交
318
// SSttFWriter ============================================================
H
Hongze Cheng 已提交
319
struct SSttFileWriter {
H
Hongze Cheng 已提交
320
  SSttFileWriterConfig config[1];
H
Hongze Cheng 已提交
321
  struct {
H
Hongze Cheng 已提交
322 323
    bool    opened;
    TABLEID tbid[1];
H
Hongze Cheng 已提交
324
  } ctx[1];
H
Hongze Cheng 已提交
325
  // file
H
Hongze Cheng 已提交
326
  STFile file[1];
H
Hongze Cheng 已提交
327
  // data
H
Hongze Cheng 已提交
328 329
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
330
  TDelBlkArray    delBlkArray[1];
H
Hongze Cheng 已提交
331
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
332 333
  SBlockData      bData[1];
  STbStatisBlock  sData[1];
H
Hongze Cheng 已提交
334
  SDelBlock       dData[1];
H
Hongze Cheng 已提交
335
  // helper data
H
Hongze Cheng 已提交
336 337
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
338 339
  int32_t  sizeArr[5];
  uint8_t *bufArr[5];
H
Hongze Cheng 已提交
340
  STsdbFD *fd;
H
Hongze Cheng 已提交
341
};
H
Hongze Cheng 已提交
342

H
Hongze Cheng 已提交
343 344 345
static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
  if (writer->bData->nRow == 0) return 0;

H
Hongze Cheng 已提交
346
  int32_t code = 0;
H
Hongze Cheng 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
  int32_t lino = 0;
  SSttBlk sttBlk[1];

  sttBlk->suid = writer->bData->suid;
  sttBlk->minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0];
  sttBlk->maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1];
  sttBlk->minKey = sttBlk->maxKey = writer->bData->aTSKEY[0];
  sttBlk->minVer = sttBlk->maxVer = writer->bData->aVersion[0];
  sttBlk->nRow = writer->bData->nRow;
  for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) {
    if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->minVer > writer->bData->aVersion[iRow]) sttBlk->minVer = writer->bData->aVersion[iRow];
    if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow];
  }

H
Hongze Cheng 已提交
363
  code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->sizeArr);
H
Hongze Cheng 已提交
364
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
365

H
Hongze Cheng 已提交
366
  sttBlk->bInfo.offset = writer->file->size;
H
Hongze Cheng 已提交
367 368
  sttBlk->bInfo.szKey = writer->sizeArr[2] + writer->sizeArr[3];
  sttBlk->bInfo.szBlock = writer->sizeArr[0] + writer->sizeArr[1] + sttBlk->bInfo.szKey;
H
Hongze Cheng 已提交
369

H
Hongze Cheng 已提交
370
  for (int32_t i = 3; i >= 0; i--) {
H
Hongze Cheng 已提交
371 372
    if (writer->sizeArr[i]) {
      code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->sizeArr[i]);
H
Hongze Cheng 已提交
373
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
374
      writer->file->size += writer->sizeArr[i];
H
Hongze Cheng 已提交
375 376 377
    }
  }

H
Hongze Cheng 已提交
378
  code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk);
H
Hongze Cheng 已提交
379
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
380

H
Hongze Cheng 已提交
381 382
  tBlockDataClear(writer->bData);

H
Hongze Cheng 已提交
383 384
_exit:
  if (code) {
H
Hongze Cheng 已提交
385
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
386
  }
H
Hongze Cheng 已提交
387 388 389
  return code;
}

H
Hongze Cheng 已提交
390
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
391
  if (STATIS_BLOCK_SIZE(writer->sData)) return 0;
H
Hongze Cheng 已提交
392

H
Hongze Cheng 已提交
393
  int32_t code = 0;
H
Hongze Cheng 已提交
394
  int32_t lino = 0;
H
Hongze Cheng 已提交
395

H
Hongze Cheng 已提交
396
  SStatisBlk statisBlk[1] = {{
H
Hongze Cheng 已提交
397
      .numRec = STATIS_BLOCK_SIZE(writer->sData),
H
Hongze Cheng 已提交
398 399 400 401 402 403 404 405 406 407
      .minTbid =
          {
              .suid = TARRAY2_FIRST(writer->sData->suid),
              .uid = TARRAY2_FIRST(writer->sData->uid),
          },
      .maxTbid =
          {
              .suid = TARRAY2_LAST(writer->sData->suid),
              .uid = TARRAY2_LAST(writer->sData->uid),
          },
H
Hongze Cheng 已提交
408 409
      .minVer = TARRAY2_FIRST(writer->sData->minVer),
      .maxVer = TARRAY2_FIRST(writer->sData->maxVer),
H
Hongze Cheng 已提交
410 411
  }};

H
Hongze Cheng 已提交
412 413 414 415
  for (int32_t i = 1; i < STATIS_BLOCK_SIZE(writer->sData); i++) {
    statisBlk->minVer = TMIN(statisBlk->minVer, TARRAY2_GET(writer->sData->minVer, i));
    statisBlk->maxVer = TMAX(statisBlk->maxVer, TARRAY2_GET(writer->sData->maxVer, i));
  }
H
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417 418
  statisBlk->dp->offset = writer->file->size;
  statisBlk->dp->size = 0;
H
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420
  for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) {
H
Hongze Cheng 已提交
421
    int32_t size;
H
Hongze Cheng 已提交
422 423 424
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->sData->dataArr[i]),
                        TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
                        &writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]);
H
Hongze Cheng 已提交
425
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
426 427 428 429

    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
430 431
    statisBlk->size[i] = size;
    statisBlk->dp->size += size;
H
Hongze Cheng 已提交
432
    writer->file->size += size;
H
Hongze Cheng 已提交
433
  }
H
Hongze Cheng 已提交
434

H
Hongze Cheng 已提交
435
  code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk);
H
Hongze Cheng 已提交
436
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
437

H
Hongze Cheng 已提交
438 439
  tStatisBlockClear(writer->sData);

H
Hongze Cheng 已提交
440 441
_exit:
  if (code) {
H
Hongze Cheng 已提交
442
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
443
  }
H
Hongze Cheng 已提交
444 445 446
  return code;
}

H
Hongze Cheng 已提交
447
static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
448
  if (DEL_BLOCK_SIZE(writer->dData) == 0) return 0;
H
Hongze Cheng 已提交
449

H
Hongze Cheng 已提交
450
  int32_t code = 0;
H
Hongze Cheng 已提交
451
  int32_t lino = 0;
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
  SDelBlk delBlk[1] = {{
      .numRec = DEL_BLOCK_SIZE(writer->dData),
      .minTid =
          {
              .suid = TARRAY2_FIRST(writer->dData->suid),
              .uid = TARRAY2_FIRST(writer->dData->uid),
          },
      .maxTid =
          {
              .suid = TARRAY2_LAST(writer->dData->suid),
              .uid = TARRAY2_LAST(writer->dData->uid),
          },
      .minVer = TARRAY2_FIRST(writer->dData->version),
      .maxVer = TARRAY2_FIRST(writer->dData->version),
      .dp[0] =
          {
              .offset = writer->file->size,
              .size = 0,
          },
  }};
H
Hongze Cheng 已提交
473

H
Hongze Cheng 已提交
474 475 476
  for (int32_t i = 1; i < DEL_BLOCK_SIZE(writer->dData); i++) {
    delBlk->minVer = TMIN(delBlk->minVer, TARRAY2_GET(writer->dData->version, i));
    delBlk->maxVer = TMAX(delBlk->maxVer, TARRAY2_GET(writer->dData->version, i));
H
Hongze Cheng 已提交
477 478
  }

H
Hongze Cheng 已提交
479 480 481
  for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->dataArr); i++) {
    int32_t size;
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->dData->dataArr[i]),
H
Hongze Cheng 已提交
482
                        TARRAY2_DATA_LEN(&writer->dData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
H
Hongze Cheng 已提交
483 484
                        &writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
485

H
Hongze Cheng 已提交
486
    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size);
H
Hongze Cheng 已提交
487 488
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
489 490 491
    delBlk->size[i] = size;
    delBlk->dp[0].size += size;
    writer->file->size += size;
H
Hongze Cheng 已提交
492 493
  }

H
Hongze Cheng 已提交
494
  code = TARRAY2_APPEND_PTR(writer->delBlkArray, delBlk);
H
Hongze Cheng 已提交
495
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
496

H
Hongze Cheng 已提交
497 498
  tDelBlockClear(writer->dData);

H
Hongze Cheng 已提交
499 500
_exit:
  if (code) {
H
Hongze Cheng 已提交
501
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
502
  }
H
Hongze Cheng 已提交
503 504 505
  return code;
}

H
Hongze Cheng 已提交
506
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
507
  int32_t code = 0;
H
Hongze Cheng 已提交
508 509
  int32_t lino;

H
Hongze Cheng 已提交
510
  writer->footer->sttBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
511
  writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray);
H
Hongze Cheng 已提交
512

H
Hongze Cheng 已提交
513
  if (writer->footer->sttBlkPtr->size) {
H
Hongze Cheng 已提交
514
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
H
Hongze Cheng 已提交
515
                         writer->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
516
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
517
    writer->file->size += writer->footer->sttBlkPtr->size;
H
Hongze Cheng 已提交
518 519 520 521
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
522
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
523
  }
H
Hongze Cheng 已提交
524 525 526
  return code;
}

H
Hongze Cheng 已提交
527
static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
528
  int32_t code = 0;
H
Hongze Cheng 已提交
529
  int32_t lino;
H
Hongze Cheng 已提交
530

H
Hongze Cheng 已提交
531
  writer->footer->statisBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
532
  writer->footer->statisBlkPtr->size = TARRAY2_DATA_LEN(writer->statisBlkArray);
H
Hongze Cheng 已提交
533

H
Hongze Cheng 已提交
534
  if (writer->footer->statisBlkPtr->size) {
H
Hongze Cheng 已提交
535
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
H
Hongze Cheng 已提交
536
                         writer->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
537
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
538
    writer->file->size += writer->footer->statisBlkPtr->size;
H
Hongze Cheng 已提交
539 540 541 542
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
543
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
544
  }
H
Hongze Cheng 已提交
545 546 547
  return code;
}

H
Hongze Cheng 已提交
548
static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
549
  int32_t code = 0;
H
Hongze Cheng 已提交
550
  int32_t lino = 0;
H
Hongze Cheng 已提交
551

H
Hongze Cheng 已提交
552
  writer->footer->delBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
553
  writer->footer->delBlkPtr->size = TARRAY2_DATA_LEN(writer->delBlkArray);
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555
  if (writer->footer->delBlkPtr->size) {
H
Hongze Cheng 已提交
556
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray),
H
Hongze Cheng 已提交
557
                         writer->footer->delBlkPtr->size);
H
Hongze Cheng 已提交
558
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
559
    writer->file->size += writer->footer->delBlkPtr->size;
H
Hongze Cheng 已提交
560 561 562 563
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
564
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
565 566 567 568
  }
  return code;
}

H
Hongze Cheng 已提交
569
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
570 571
  writer->footer->prevFooter = writer->config->file.size;
  int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer));
H
Hongze Cheng 已提交
572
  writer->file->size += sizeof(writer->footer);
H
Hongze Cheng 已提交
573 574 575
  return code;
}

H
Hongze Cheng 已提交
576
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
577
  int32_t code = 0;
H
Hongze Cheng 已提交
578
  int32_t lino = 0;
H
Hongze Cheng 已提交
579
  int32_t vid = TD_VID(writer->config->tsdb->pVnode);
H
Hongze Cheng 已提交
580

H
Hongze Cheng 已提交
581
  // set
H
Hongze Cheng 已提交
582 583 584 585
  writer->file[0] = writer->config->file;
  writer->file->stt->nseg++;
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
H
Hongze Cheng 已提交
586
  if (!writer->config->aBuf) writer->config->aBuf = writer->bufArr;
H
Hongze Cheng 已提交
587

H
Hongze Cheng 已提交
588 589 590
  // open file
  int32_t flag;
  char    fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
591

H
Hongze Cheng 已提交
592
  if (writer->file->size) {
H
Hongze Cheng 已提交
593 594 595
    flag = TD_FILE_READ | TD_FILE_WRITE;
  } else {
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
596
  }
H
Hongze Cheng 已提交
597

H
Hongze Cheng 已提交
598
  tsdbTFileName(writer->config->tsdb, writer->file, fname);
H
Hongze Cheng 已提交
599
  code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
H
Hongze Cheng 已提交
600 601
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
602
  if (!writer->file->size) {
H
Hongze Cheng 已提交
603 604 605
    uint8_t hdr[TSDB_FHDR_SIZE] = {0};
    code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
606
    writer->file->size += sizeof(hdr);
H
Hongze Cheng 已提交
607
  }
H
Hongze Cheng 已提交
608

H
Hongze Cheng 已提交
609
_exit:
H
Hongze Cheng 已提交
610
  if (code) {
H
Hongze Cheng 已提交
611
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
612
  } else {
H
Hongze Cheng 已提交
613
    writer->ctx->opened = true;
H
Hongze Cheng 已提交
614
  }
H
Hongze Cheng 已提交
615
  return code;
H
Hongze Cheng 已提交
616 617
}

H
Hongze Cheng 已提交
618 619
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
  ASSERT(!writer->fd);
H
Hongze Cheng 已提交
620

H
Hongze Cheng 已提交
621 622
  for (int32_t i = 0; i < ARRAY_SIZE(writer->sizeArr); ++i) {
    tFree(writer->bufArr[i]);
H
Hongze Cheng 已提交
623 624 625 626 627 628 629
  }
  tDestroyTSchema(writer->skmRow->pTSchema);
  tDestroyTSchema(writer->skmTb->pTSchema);
  tStatisBlockFree(writer->sData);
  tDelBlockFree(writer->dData);
  tBlockDataDestroy(writer->bData);
  TARRAY2_FREE(writer->delBlkArray);
H
Hongze Cheng 已提交
630
  TARRAY2_FREE(writer->statisBlkArray);
H
Hongze Cheng 已提交
631
  TARRAY2_FREE(writer->sttBlkArray);
H
Hongze Cheng 已提交
632
}
H
Hongze Cheng 已提交
633

H
Hongze Cheng 已提交
634 635 636 637
static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
  // TODO
  return 0;
}
H
Hongze Cheng 已提交
638

H
Hongze Cheng 已提交
639
static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
640 641
  int32_t lino;
  int32_t code;
H
Hongze Cheng 已提交
642

H
Hongze Cheng 已提交
643 644
  code = tsdbSttFileDoWriteTSDataBlock(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
645

H
Hongze Cheng 已提交
646
  code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
647 648
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
649 650
  code = tsdbSttFileDoWriteDelBlock(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
651

H
Hongze Cheng 已提交
652 653
  code = tsdbSttFileDoWriteSttBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
654

H
Hongze Cheng 已提交
655 656
  code = tsdbSttFileDoWriteStatisBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
657

H
Hongze Cheng 已提交
658 659
  code = tsdbSttFileDoWriteDelBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
660

H
Hongze Cheng 已提交
661 662
  code = tsdbSttFileDoWriteFooter(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
663

H
Hongze Cheng 已提交
664
  code = tsdbSttFileDoUpdateHeader(writer);
H
Hongze Cheng 已提交
665 666
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
667
  code = tsdbFsyncFile(writer->fd);
H
Hongze Cheng 已提交
668 669
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
670
  tsdbCloseFile(&writer->fd);
H
Hongze Cheng 已提交
671

H
Hongze Cheng 已提交
672
  ASSERT(writer->config->file.size < writer->file->size);
H
Hongze Cheng 已提交
673 674 675 676 677 678 679 680 681
  STFileOp op = {
      .optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE,
      .fid = writer->config->file.fid,
      .of = writer->config->file,
      .nf = writer->file[0],
  };

  code = TARRAY2_APPEND(opArray, op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
682

H
Hongze Cheng 已提交
683 684
_exit:
  if (code) {
H
Hongze Cheng 已提交
685
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
686
  }
H
Hongze Cheng 已提交
687
  return code;
H
Hongze Cheng 已提交
688
}
H
Hongze Cheng 已提交
689

H
Hongze Cheng 已提交
690
static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
691
  if (writer->config->file.size) {  // truncate the file to the original size
H
Hongze Cheng 已提交
692 693
    ASSERT(writer->config->file.size <= writer->file->size);
    if (writer->config->file.size < writer->file->size) {
H
Hongze Cheng 已提交
694
      taosFtruncateFile(writer->fd->pFD, writer->config->file.size);
H
Hongze Cheng 已提交
695 696 697
      tsdbCloseFile(&writer->fd);
    }
  } else {  // remove the file
H
Hongze Cheng 已提交
698 699
    char fname[TSDB_FILENAME_LEN];
    tsdbTFileName(writer->config->tsdb, &writer->config->file, fname);
H
Hongze Cheng 已提交
700 701 702 703
    tsdbCloseFile(&writer->fd);
    taosRemoveFile(fname);
  }

H
Hongze Cheng 已提交
704 705
  return 0;
}
H
Hongze Cheng 已提交
706

H
Hongze Cheng 已提交
707
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) {
H
Hongze Cheng 已提交
708
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
H
Hongze Cheng 已提交
709 710
  if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
711
  writer[0]->config[0] = config[0];
H
Hongze Cheng 已提交
712
  writer[0]->ctx->opened = false;
H
Hongze Cheng 已提交
713 714 715
  return 0;
}

H
Hongze Cheng 已提交
716
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
717
  int32_t code = 0;
H
Hongze Cheng 已提交
718
  int32_t lino = 0;
H
Hongze Cheng 已提交
719
  int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode);
H
Hongze Cheng 已提交
720

H
Hongze Cheng 已提交
721
  if (writer[0]->ctx->opened) {
H
Hongze Cheng 已提交
722 723
    if (abort) {
      code = tsdbSttFWriterCloseAbort(writer[0]);
H
Hongze Cheng 已提交
724
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
725
    } else {
H
Hongze Cheng 已提交
726
      code = tsdbSttFWriterCloseCommit(writer[0], opArray);
H
Hongze Cheng 已提交
727
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
728
    }
H
Hongze Cheng 已提交
729
    tsdbSttFWriterDoClose(writer[0]);
H
Hongze Cheng 已提交
730
  }
H
Hongze Cheng 已提交
731 732
  taosMemoryFree(writer[0]);
  writer[0] = NULL;
H
Hongze Cheng 已提交
733 734 735

_exit:
  if (code) {
H
Hongze Cheng 已提交
736
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
737
  }
H
Hongze Cheng 已提交
738
  return code;
H
Hongze Cheng 已提交
739 740
}

H
Hongze Cheng 已提交
741
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
H
Hongze Cheng 已提交
742
  int32_t code = 0;
H
Hongze Cheng 已提交
743
  int32_t lino = 0;
H
Hongze Cheng 已提交
744
  int32_t vid = TD_VID(writer->config->tsdb->pVnode);
H
Hongze Cheng 已提交
745

H
Hongze Cheng 已提交
746
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
747 748 749
    code = tsdbSttFWriterDoOpen(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
750

H
Hongze Cheng 已提交
751 752
  TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
  if (!TABLE_SAME_SCHEMA(row->suid, row->uid, writer->ctx->tbid->suid, writer->ctx->tbid->uid)) {
H
Hongze Cheng 已提交
753 754
    code = tsdbSttFileDoWriteTSDataBlock(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
755

H
Hongze Cheng 已提交
756 757 758 759 760 761 762 763 764 765 766 767
    code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb);
    TSDB_CHECK_CODE(code, lino, _exit);

    TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid};
    code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  if (writer->ctx->tbid->uid != row->uid) {
    writer->ctx->tbid->suid = row->suid;
    writer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
768
    if (STATIS_BLOCK_SIZE(writer->sData) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
769
      code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
770
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
771 772
    }

H
Hongze Cheng 已提交
773 774 775
    STbStatisRecord record[1] = {{
        .suid = row->suid,
        .uid = row->uid,
H
Hongze Cheng 已提交
776 777 778 779 780 781
        .firstKey = key->ts,
        .firstKeyVer = key->version,
        .lastKey = key->ts,
        .lastKeyVer = key->version,
        .minVer = key->version,
        .maxVer = key->version,
H
Hongze Cheng 已提交
782 783 784 785
        .count = 1,
    }};
    code = tStatisBlockPut(writer->sData, record);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
786 787
  }

H
Hongze Cheng 已提交
788
  if (row->row.type == TSDBROW_ROW_FMT) {
H
Hongze Cheng 已提交
789 790
    code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid,  //
                            TSDBROW_SVERSION(&row->row), writer->config->skmRow);
H
Hongze Cheng 已提交
791
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
792 793
  }

H
Hongze Cheng 已提交
794
  // row to col conversion
H
Hongze Cheng 已提交
795 796 797 798 799 800 801 802 803
  if (key->version <= writer->config->compactVersion                       //
      && writer->bData->nRow > 0                                           //
      && (writer->bData->uid                                               //
              ? writer->bData->uid                                         //
              : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid  //
      && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts         //
  ) {
    code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
804 805 806 807 808
  } else {
    if (writer->bData->nRow >= writer->config->maxRow) {
      code = tsdbSttFileDoWriteTSDataBlock(writer);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
809

H
Hongze Cheng 已提交
810
    code = tBlockDataAppendRow(writer->bData, &row->row, writer->config->skmRow->pTSchema, row->uid);
H
Hongze Cheng 已提交
811
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
812 813
  }

H
Hongze Cheng 已提交
814 815 816 817 818 819 820 821
  TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version);
  TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version);
  if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) {
    TARRAY2_LAST(writer->sData->lastKey) = key->ts;
    TARRAY2_LAST(writer->sData->lastKeyVer) = key->version;
    TARRAY2_LAST(writer->sData->count)++;
  } else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) {
    TARRAY2_LAST(writer->sData->lastKeyVer) = key->version;
H
Hongze Cheng 已提交
822 823 824 825
  } else {
    ASSERTS(0, "timestamp should be in ascending order");
  }

H
Hongze Cheng 已提交
826 827
_exit:
  if (code) {
H
Hongze Cheng 已提交
828
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
829
  }
H
Hongze Cheng 已提交
830
  return code;
H
Hongze Cheng 已提交
831 832
}

H
Hongze Cheng 已提交
833
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) {
H
Hongze Cheng 已提交
834 835 836
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
837 838
  SRowInfo row[1];
  row->suid = bdata->suid;
H
Hongze Cheng 已提交
839
  for (int32_t i = 0; i < bdata->nRow; i++) {
H
Hongze Cheng 已提交
840 841
    row->uid = bdata->uid ? bdata->uid : bdata->aUid[i];
    row->row = tsdbRowFromBlockData(bdata, i);
H
Hongze Cheng 已提交
842

H
Hongze Cheng 已提交
843
    code = tsdbSttFileWriteTSData(writer, row);
H
Hongze Cheng 已提交
844 845 846 847 848
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
849
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
850
  }
H
Hongze Cheng 已提交
851
  return code;
H
Hongze Cheng 已提交
852 853
}

H
Hongze Cheng 已提交
854
int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *record) {
H
Hongze Cheng 已提交
855
  int32_t code;
H
Hongze Cheng 已提交
856 857
  int32_t lino;

H
Hongze Cheng 已提交
858
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
859 860 861 862
    code = tsdbSttFWriterDoOpen(writer);
    return code;
  }

H
Hongze Cheng 已提交
863 864 865 866 867
  // end time-series data write
  if (writer->bData->nRow > 0) {
    code = tsdbSttFileDoWriteTSDataBlock(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
868

H
Hongze Cheng 已提交
869 870 871 872
  if (STATIS_BLOCK_SIZE(writer->sData) > 0) {
    code = tsdbSttFileDoWriteStatisBlock(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
873

H
Hongze Cheng 已提交
874 875 876
  // write SDelRecord
  code = tDelBlockPut(writer->dData, record);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
877

H
Hongze Cheng 已提交
878 879 880 881
  // write SDelBlock if need
  if (DEL_BLOCK_SIZE(writer->dData) >= writer->config->maxRow) {
    code = tsdbSttFileDoWriteDelBlock(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
882 883 884 885
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
886
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
887 888
  }
  return code;
H
Hongze Cheng 已提交
889
}