tsdbCommit.c 32.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19 20 21
typedef struct {
  int64_t   suid;
  int64_t   uid;
  STSchema *pTSchema;
} SSkmInfo;
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23
typedef struct {
H
Hongze Cheng 已提交
24
  STsdb *pTsdb;
H
Hongze Cheng 已提交
25
  /* commit data */
H
Hongze Cheng 已提交
26
  int64_t commitID;
H
Hongze Cheng 已提交
27 28
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
29 30
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
31
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32
  // --------------
H
Hongze Cheng 已提交
33
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
34 35 36
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
37
  // commit file data
H
Hongze Cheng 已提交
38
  SDataFReader *pReader;
H
Hongze Cheng 已提交
39 40
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      oBlockMap;  // SMapData<SBlock>, read from reader
H
Hongze Cheng 已提交
41
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
42
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
43 44
  SArray       *aBlockIdxN;  // SArray<SBlockIdx>
  SMapData      nBlockMap;   // SMapData<SBlock>
H
Hongze Cheng 已提交
45
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
46 47
  SSkmInfo      skmTable;
  SSkmInfo      skmRow;
H
Hongze Cheng 已提交
48
  /* commit del */
H
Hongze Cheng 已提交
49 50
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
51 52 53
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
54
} SCommitter;
H
refact  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56 57 58 59 60
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
61

H
refact  
Hongze Cheng 已提交
62
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
63
  int32_t code = 0;
H
Hongze Cheng 已提交
64

65 66
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
67
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
68
  if (code) goto _err;
H
Hongze Cheng 已提交
69

H
Hongze Cheng 已提交
70 71 72
  return code;

_err:
H
Hongze Cheng 已提交
73
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
74
  return code;
H
Hongze Cheng 已提交
75 76
}

H
more  
Hongze Cheng 已提交
77
int32_t tsdbCommit(STsdb *pTsdb) {
78
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
79

H
more  
Hongze Cheng 已提交
80
  int32_t    code = 0;
H
Hongze Cheng 已提交
81 82 83 84
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
85 86
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
87 88 89 90
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
91

H
more  
Hongze Cheng 已提交
92
  // start commit
H
more  
Hongze Cheng 已提交
93
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
94
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
95

H
refact  
Hongze Cheng 已提交
96 97
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
98
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
99 100

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
101
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
102 103

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
104
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
105 106

  // end commit
H
more  
Hongze Cheng 已提交
107
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
108
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
_exit:
H
refact  
Hongze Cheng 已提交
111 112 113
  return code;

_err:
H
Hongze Cheng 已提交
114
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
115
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
116 117 118
  return code;
}

H
Hongze Cheng 已提交
119
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
120 121 122 123
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142
  SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile;
H
Hongze Cheng 已提交
143
  if (pDelFileR) {
H
Hongze Cheng 已提交
144
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
145
    if (code) goto _err;
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
H
Hongze Cheng 已提交
148
    if (code) goto _err;
H
Hongze Cheng 已提交
149 150
  }

H
Hongze Cheng 已提交
151
  // prepare new
H
Hongze Cheng 已提交
152 153
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
154
  if (code) goto _err;
H
Hongze Cheng 已提交
155 156 157 158 159 160 161

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
162 163 164
  return code;
}

H
Hongze Cheng 已提交
165
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
166
  int32_t   code = 0;
H
Hongze Cheng 已提交
167
  SDelData *pDelData;
H
Hongze Cheng 已提交
168 169
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
  taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
172 173

  if (pTbData) {
H
Hongze Cheng 已提交
174 175
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177 178 179 180
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
181 182

  if (pDelIdx) {
H
Hongze Cheng 已提交
183 184 185 186
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
H
Hongze Cheng 已提交
187 188 189
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
190
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
191

H
Hongze Cheng 已提交
192
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
193 194

  // memory
H
Hongze Cheng 已提交
195 196
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
197 198 199 200
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
201 202 203
  }

  // write
H
Hongze Cheng 已提交
204
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
H
Hongze Cheng 已提交
205 206 207
  if (code) goto _err;

  // put delIdx
H
Hongze Cheng 已提交
208 209 210 211
  if (taosArrayPush(pCommitter->aDelIdx, &delIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
212 213 214 215 216 217 218 219 220

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
221 222
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
223
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
224

H
Hongze Cheng 已提交
225
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
H
Hongze Cheng 已提交
226
  if (code) goto _err;
H
Hongze Cheng 已提交
227

H
Hongze Cheng 已提交
228 229 230 231
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

  code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
232
  if (code) goto _err;
H
Hongze Cheng 已提交
233

H
Hongze Cheng 已提交
234
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
235
  if (code) goto _err;
H
Hongze Cheng 已提交
236 237

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
238
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
239 240 241
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
242 243 244 245
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
246 247 248
  return code;

_err:
H
Hongze Cheng 已提交
249
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
250 251 252
  return code;
}

H
Hongze Cheng 已提交
253 254 255 256 257
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
  SDFileSet  wSet;
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259 260
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262
  // old
H
Hongze Cheng 已提交
263
  taosArrayClear(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
264 265 266 267 268
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockDataReset(&pCommitter->oBlockData);
  pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
  if (pRSet) {
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
269 270
    if (code) goto _err;

H
Hongze Cheng 已提交
271
    code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
H
Hongze Cheng 已提交
272
    if (code) goto _err;
H
Hongze Cheng 已提交
273
  }
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275
  // new
H
Hongze Cheng 已提交
276
  taosArrayClear(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
277
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
278
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
279 280 281 282 283 284 285 286 287 288
  if (pRSet) {
    wSet = (SDFileSet){.diskId = pRSet->diskId,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = pRSet->fData,
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = pRSet->fSma};
  } else {
    STfs   *pTfs = pTsdb->pVnode->pTfs;
    SDiskID did = {.level = 0, .id = 0};
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291
    // TODO: alloc a new disk
    // tfsAllocDisk(pTfs, 0, &did);
H
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293 294
    // create the directory
    tfsMkdirRecurAt(pTfs, pTsdb->path, did);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296 297 298 299 300 301
    wSet = (SDFileSet){.diskId = did,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = {.commitID = pCommitter->commitID, .size = 0},
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = {.commitID = pCommitter->commitID, .size = 0}};
H
Hongze Cheng 已提交
302
  }
H
Hongze Cheng 已提交
303 304
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
  if (code) goto _err;
H
Hongze Cheng 已提交
305

H
Hongze Cheng 已提交
306
_exit:
H
Hongze Cheng 已提交
307 308 309
  return code;

_err:
H
Hongze Cheng 已提交
310
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
311
  return code;
H
Hongze Cheng 已提交
312 313
}

H
Hongze Cheng 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmTable.pTSchema) {
    if (pCommitter->skmTable.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmTable.suid = suid;
  pCommitter->skmTable.uid = uid;
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
330 331
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
  if (code) goto _exit;
H
Hongze Cheng 已提交
332 333 334 335 336 337

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
H
Hongze Cheng 已提交
338 339
  int32_t code = 0;

H
Hongze Cheng 已提交
340 341
  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
H
Hongze Cheng 已提交
342
      if (suid == 0) {
H
Hongze Cheng 已提交
343
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
344
      } else {
H
Hongze Cheng 已提交
345
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
346 347 348 349
      }
    }
  }

H
Hongze Cheng 已提交
350 351 352
  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
353 354 355
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
  if (code) {
    goto _exit;
H
Hongze Cheng 已提交
356 357 358 359 360 361
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
362 363 364 365
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
                                   int8_t toDataOnly) {
  int32_t code = 0;

H
Hongze Cheng 已提交
366 367 368 369 370 371
  if (pBlock->nSubBlock == 0) {
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }
H
Hongze Cheng 已提交
372 373 374 375 376 377 378 379 380 381 382 383 384 385
  }

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
386 387
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
388
  int32_t     code = 0;
H
Hongze Cheng 已提交
389 390 391
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
392 393
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
394
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
395 396
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
H
Hongze Cheng 已提交
397 398

  // read SBlockData
H
Hongze Cheng 已提交
399
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
400 401
  if (code) goto _err;

H
Hongze Cheng 已提交
402 403 404
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
405 406
  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
407
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
408
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
409
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
H
Hongze Cheng 已提交
410
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
411 412
  if (code) goto _err;

H
Hongze Cheng 已提交
413
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
414
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
415 416
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
417
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
418 419 420 421 422 423 424
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
425 426 427 428 429
      int32_t c = tsdbRowCmprFn(pRow1, pRow2);
      if (c < 0) {
        goto _append_mem_row;
      } else if (c > 0) {
        goto _append_block_row;
H
Hongze Cheng 已提交
430 431 432 433
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
434
      goto _append_mem_row;
H
Hongze Cheng 已提交
435
    } else {
H
Hongze Cheng 已提交
436 437 438 439
      goto _append_block_row;
    }

  _append_mem_row:
H
Hongze Cheng 已提交
440
    code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
441
    if (code) goto _err;
H
Hongze Cheng 已提交
442

H
Hongze Cheng 已提交
443 444 445 446
    tsdbTbDataIterNext(pIter);
    pRow1 = tsdbTbDataIterGet(pIter);
    if (pRow1) {
      if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
H
Hongze Cheng 已提交
447
        code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
448
        if (code) goto _err;
H
Hongze Cheng 已提交
449
      } else {
H
Hongze Cheng 已提交
450
        pRow1 = NULL;
H
Hongze Cheng 已提交
451
      }
H
Hongze Cheng 已提交
452 453
    }

H
Hongze Cheng 已提交
454 455 456 457 458 459 460
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }

  _append_block_row:
H
Hongze Cheng 已提交
461
    code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
H
Hongze Cheng 已提交
462 463
    if (code) goto _err;

H
Hongze Cheng 已提交
464 465
    if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
      *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
H
Hongze Cheng 已提交
466
    } else {
H
Hongze Cheng 已提交
467
      pRow2 = NULL;
H
Hongze Cheng 已提交
468 469
    }

H
Hongze Cheng 已提交
470 471 472 473 474
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476
  _write_block:
H
Hongze Cheng 已提交
477
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
H
Hongze Cheng 已提交
478 479 480
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
481
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
482 483 484 485 486 487 488 489 490
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
491
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
492 493
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
494 495
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
496
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
497 498
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
499

H
Hongze Cheng 已提交
500 501 502
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
503
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
504
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
505
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
506
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
507
  while (true) {
H
Hongze Cheng 已提交
508
    if (pRow == NULL) {
H
Hongze Cheng 已提交
509 510 511 512 513 514 515
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
516
    // update schema
H
Hongze Cheng 已提交
517
    code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
518 519
    if (code) goto _err;

H
Hongze Cheng 已提交
520
    // append
H
Hongze Cheng 已提交
521
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
522 523 524 525
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
526
    if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
H
Hongze Cheng 已提交
527 528

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
529
    continue;
H
Hongze Cheng 已提交
530 531

  _write_block:
H
Hongze Cheng 已提交
532
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
H
Hongze Cheng 已提交
533 534 535
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
536
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
537 538 539 540 541
  }

  return code;

_err:
H
Hongze Cheng 已提交
542
  tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
543 544 545
  return code;
}

H
Hongze Cheng 已提交
546
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
547
  int32_t code = 0;
H
Hongze Cheng 已提交
548
  SBlock  block;
H
Hongze Cheng 已提交
549 550

  if (pBlock->last) {
H
Hongze Cheng 已提交
551
    code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
H
Hongze Cheng 已提交
552 553
    if (code) goto _err;

H
Hongze Cheng 已提交
554 555
    tBlockReset(&block);
    code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
556 557 558 559 560 561 562 563 564 565
    if (code) goto _err;
  } else {
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
566 567 568
  return code;
}

H
Hongze Cheng 已提交
569 570
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
571 572
  SBlockIdx  blockIdx = {.suid = suid, .uid = uid};
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
573 574 575 576

  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  if (code) goto _err;

H
Hongze Cheng 已提交
577 578 579 580
  if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
581 582 583 584 585 586 587 588

  return code;

_err:
  tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
H
Hongze Cheng 已提交
606
      tsdbTbDataIterNext(pIter);
H
Hongze Cheng 已提交
607 608 609 610 611 612 613 614 615 616 617 618 619 620
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
H
Hongze Cheng 已提交
621
  SBlock      block;
H
Hongze Cheng 已提交
622 623
  TSDBROW    *pRow;

H
Hongze Cheng 已提交
624 625 626
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
627
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
628
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
629 630 631
  if (code) goto _err;
  while (true) {
    if (pRow) break;
H
Hongze Cheng 已提交
632
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
633 634 635 636 637
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
H
Hongze Cheng 已提交
638 639
      TSDBKEY key = TSDBROW_KEY(pRow);
      int32_t c = tBlockCmprFn(&(SBlock){.minKey = key, .maxKey = key}, pBlock);
H
Hongze Cheng 已提交
640 641

      if (c == 0) {
H
Hongze Cheng 已提交
642 643
        code =
            tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
644 645 646 647 648 649 650 651 652
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

H
Hongze Cheng 已提交
653 654
  block = *pBlock;
  code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
655 656 657 658 659 660 661 662 663
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
664 665
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
666 667
  STbDataIter  iter = {0};
  STbDataIter *pIter = &iter;
H
Hongze Cheng 已提交
668
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
669
  int32_t      iBlock;
H
Hongze Cheng 已提交
670
  int32_t      nBlock;
H
Hongze Cheng 已提交
671 672
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
673 674 675 676

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
677 678
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
679 680
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
681 682 683 684 685 686 687 688 689 690 691
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
    if (code) goto _err;

    nBlock = pCommitter->oBlockMap.nItem;
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
692

H
Hongze Cheng 已提交
693 694
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
695 696 697 698
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
699
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
700 701

  // start ===========
H
Hongze Cheng 已提交
702
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
703 704
  SBlock  block;
  SBlock *pBlock = &block;
H
Hongze Cheng 已提交
705

H
Hongze Cheng 已提交
706
  iBlock = 0;
H
Hongze Cheng 已提交
707 708 709 710 711 712
  if (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
713 714 715 716 717
  if (pRow) {
    code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
718 719
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
720
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
721

H
Hongze Cheng 已提交
722
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
723
      if (pBlock->last) {
H
Hongze Cheng 已提交
724
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
725
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
726 727 728
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
729
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
730
        iBlock++;
H
Hongze Cheng 已提交
731 732 733 734 735
        if (iBlock < nBlock) {
          tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
736 737

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
738
      } else {
H
Hongze Cheng 已提交
739
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
740
        if (c > 0) {
H
Hongze Cheng 已提交
741
          // only disk data
H
Hongze Cheng 已提交
742
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
743 744
          if (code) goto _err;

H
Hongze Cheng 已提交
745 746 747 748 749 750
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
751
        } else if (c < 0) {
H
Hongze Cheng 已提交
752
          // only memory data
H
Hongze Cheng 已提交
753
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
754 755 756
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
757
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
758
        } else {
H
Hongze Cheng 已提交
759
          // merge memory and disk
H
Hongze Cheng 已提交
760 761
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
762
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
763 764
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
765
          } else {
H
Hongze Cheng 已提交
766 767 768 769 770 771 772 773 774 775
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
              tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
776
            }
H
Hongze Cheng 已提交
777 778 779

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
780
          }
H
Hongze Cheng 已提交
781 782 783 784 785 786 787 788 789

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
790 791 792
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
793
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
794 795
      if (code) goto _err;

H
Hongze Cheng 已提交
796 797 798 799 800 801
      iBlock++;
      if (iBlock < nBlock) {
        tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
802
    } else {
H
Hongze Cheng 已提交
803 804
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
805 806 807
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
808 809
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
810 811 812
    }
  }

H
Hongze Cheng 已提交
813 814
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
815 816 817 818 819 820 821 822 823 824 825 826 827 828
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

  // write blockIdx
  code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
  if (code) goto _err;

  // update file header
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
  if (code) goto _err;

  // upsert SDFileSet
  code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
  if (code) goto _err;

  // close and sync
  code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
  if (code) goto _err;

  if (pCommitter->pReader) {
    code = tsdbDataFReaderClose(&pCommitter->pReader);
    goto _err;
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
862 863 864
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
865 866 867 868 869 870

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

  // commit file data impl
H
Hongze Cheng 已提交
871 872
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
873
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
874
  int32_t    nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
875
  STbData   *pTbData;
H
Hongze Cheng 已提交
876
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
877

H
Hongze Cheng 已提交
878
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
879

H
Hongze Cheng 已提交
880
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
H
Hongze Cheng 已提交
881
  pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
882 883 884
  while (pTbData || pBlockIdx) {
    if (pTbData && pBlockIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
H
Hongze Cheng 已提交
885

H
Hongze Cheng 已提交
886
      if (c == 0) {
H
Hongze Cheng 已提交
887
        goto _commit_table_mem_and_disk;
H
Hongze Cheng 已提交
888
      } else if (c < 0) {
H
Hongze Cheng 已提交
889
        goto _commit_table_mem_data;
H
Hongze Cheng 已提交
890
      } else {
H
Hongze Cheng 已提交
891
        goto _commit_table_disk_data;
H
Hongze Cheng 已提交
892
      }
H
Hongze Cheng 已提交
893
    } else if (pBlockIdx) {
H
Hongze Cheng 已提交
894 895 896 897
      goto _commit_table_disk_data;
    } else {
      goto _commit_table_mem_data;
    }
H
Hongze Cheng 已提交
898

H
Hongze Cheng 已提交
899 900 901 902 903
  _commit_table_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
904
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
905
    continue;
H
Hongze Cheng 已提交
906

H
Hongze Cheng 已提交
907 908 909 910 911
  _commit_table_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
912
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
913 914 915 916 917 918 919
    continue;

  _commit_table_mem_and_disk:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
920
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
921
    iTbData++;
H
Hongze Cheng 已提交
922
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
923
    continue;
H
Hongze Cheng 已提交
924 925
  }

H
Hongze Cheng 已提交
926 927
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
928
  if (code) goto _err;
H
Hongze Cheng 已提交
929 930 931 932

  return code;

_err:
H
Hongze Cheng 已提交
933
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
934 935
  tsdbDataFReaderClose(&pCommitter->pReader);
  tsdbDataFWriterClose(&pCommitter->pWriter, 0);
H
Hongze Cheng 已提交
936 937 938
  return code;
}

H
Hongze Cheng 已提交
939 940
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
941
  int32_t code = 0;
H
Hongze Cheng 已提交
942

H
Hongze Cheng 已提交
943 944
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
945

H
Hongze Cheng 已提交
946 947 948 949
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
950

H
Hongze Cheng 已提交
951
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
952
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
953 954 955 956
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
957
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
958

H
Hongze Cheng 已提交
959 960 961 962 963 964 965
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
966 967 968
  return code;
}

H
Hongze Cheng 已提交
969 970 971
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
972
  pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
973 974 975 976 977
  if (pCommitter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
978
  pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
979 980 981 982 983
  if (pCommitter->aBlockIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
984 985
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
H
Hongze Cheng 已提交
986

H
Hongze Cheng 已提交
987
  code = tBlockDataInit(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
988
  if (code) goto _exit;
H
Hongze Cheng 已提交
989 990 991 992 993 994

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
995
  taosArrayDestroy(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
996 997
  tMapDataClear(&pCommitter->oBlockMap);
  tBlockDataClear(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
998
  taosArrayDestroy(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
999 1000
  tMapDataClear(&pCommitter->nBlockMap);
  tBlockDataClear(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
1001 1002
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
1003 1004
}

H
Hongze Cheng 已提交
1005 1006 1007 1008
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1009

H
Hongze Cheng 已提交
1010
  // check
H
Hongze Cheng 已提交
1011
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
1012

H
Hongze Cheng 已提交
1013 1014
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
1015
  if (code) goto _err;
H
Hongze Cheng 已提交
1016 1017 1018

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1019 1020 1021 1022 1023
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1024
    if (code) goto _err;
H
Hongze Cheng 已提交
1025
  }
H
Hongze Cheng 已提交
1026

H
Hongze Cheng 已提交
1027 1028 1029
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1030 1031 1032
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
1033

H
Hongze Cheng 已提交
1034
_err:
H
Hongze Cheng 已提交
1035
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
1036 1037 1038
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1039

H
Hongze Cheng 已提交
1040 1041 1042 1043
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1044

H
Hongze Cheng 已提交
1045 1046
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1047
  }
H
Hongze Cheng 已提交
1048

H
Hongze Cheng 已提交
1049 1050 1051 1052 1053
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1054

H
Hongze Cheng 已提交
1055
  // impl
H
Hongze Cheng 已提交
1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
  int32_t  nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1111
  }
H
Hongze Cheng 已提交
1112

H
Hongze Cheng 已提交
1113 1114 1115 1116 1117
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1118

H
Hongze Cheng 已提交
1119
_exit:
H
Hongze Cheng 已提交
1120
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1121 1122 1123
  return code;

_err:
H
Hongze Cheng 已提交
1124
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1125
  return code;
H
Hongze Cheng 已提交
1126 1127 1128 1129 1130 1131 1132
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
1133 1134

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

  if (eno == 0) {
    code = tsdbFSCommit(pTsdb->fs);
  } else {
    code = tsdbFSRollback(pTsdb->fs);
  }

  tsdbMemTableDestroy(pMemTable);
  pTsdb->imem = NULL;

  tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1153 1154
  return code;
}