tsdbCommit.c 32.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19 20 21
typedef struct {
  int64_t   suid;
  int64_t   uid;
  STSchema *pTSchema;
} SSkmInfo;
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23
typedef struct {
H
Hongze Cheng 已提交
24
  STsdb *pTsdb;
H
Hongze Cheng 已提交
25
  /* commit data */
H
Hongze Cheng 已提交
26
  int64_t commitID;
H
Hongze Cheng 已提交
27 28
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
29 30
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
31
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32
  // --------------
H
Hongze Cheng 已提交
33
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
34 35 36
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
37
  // commit file data
H
Hongze Cheng 已提交
38
  SDataFReader *pReader;
H
Hongze Cheng 已提交
39 40
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      oBlockMap;  // SMapData<SBlock>, read from reader
H
Hongze Cheng 已提交
41
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
42
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
43 44
  SArray       *aBlockIdxN;  // SArray<SBlockIdx>
  SMapData      nBlockMap;   // SMapData<SBlock>
H
Hongze Cheng 已提交
45
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
46 47
  SSkmInfo      skmTable;
  SSkmInfo      skmRow;
H
Hongze Cheng 已提交
48
  /* commit del */
H
Hongze Cheng 已提交
49 50
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
51 52 53
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
54
} SCommitter;
H
refact  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56 57 58 59 60
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
61

H
refact  
Hongze Cheng 已提交
62
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
63
  int32_t code = 0;
H
Hongze Cheng 已提交
64

65 66
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
67 68
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
69
  if (code) goto _err;
H
Hongze Cheng 已提交
70

H
Hongze Cheng 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
  // lock
  code = taosThreadRwlockWrlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

  pTsdb->mem = pMemTable;

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

H
Hongze Cheng 已提交
87 88 89
  return code;

_err:
H
Hongze Cheng 已提交
90
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
91
  return code;
H
Hongze Cheng 已提交
92 93
}

H
more  
Hongze Cheng 已提交
94
int32_t tsdbCommit(STsdb *pTsdb) {
95
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
96

H
more  
Hongze Cheng 已提交
97
  int32_t    code = 0;
H
Hongze Cheng 已提交
98 99 100 101
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
102
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
103
    taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
104
    pTsdb->mem = NULL;
H
Hongze Cheng 已提交
105 106 107
    taosThreadRwlockUnlock(&pTsdb->rwLock);

    tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
108 109
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
110

H
more  
Hongze Cheng 已提交
111
  // start commit
H
more  
Hongze Cheng 已提交
112
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
113
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
114

H
refact  
Hongze Cheng 已提交
115 116
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
117
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
118 119

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
120
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
121 122

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
123
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
124 125

  // end commit
H
more  
Hongze Cheng 已提交
126
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
127
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
128

H
Hongze Cheng 已提交
129
_exit:
H
refact  
Hongze Cheng 已提交
130 131 132
  return code;

_err:
H
Hongze Cheng 已提交
133
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
134
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
135 136 137
  return code;
}

H
Hongze Cheng 已提交
138
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
139 140 141 142
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
160

H
Hongze Cheng 已提交
161
  SDelFile *pDelFileR = pTsdb->pFS->nState->pDelFile;
H
Hongze Cheng 已提交
162
  if (pDelFileR) {
H
Hongze Cheng 已提交
163
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
164
    if (code) goto _err;
H
Hongze Cheng 已提交
165

H
Hongze Cheng 已提交
166
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
H
Hongze Cheng 已提交
167
    if (code) goto _err;
H
Hongze Cheng 已提交
168 169
  }

H
Hongze Cheng 已提交
170
  // prepare new
H
Hongze Cheng 已提交
171 172
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
173
  if (code) goto _err;
H
Hongze Cheng 已提交
174 175 176 177 178 179 180

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
181 182 183
  return code;
}

H
Hongze Cheng 已提交
184
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
185
  int32_t   code = 0;
H
Hongze Cheng 已提交
186
  SDelData *pDelData;
H
Hongze Cheng 已提交
187 188
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
189 190

  if (pTbData) {
H
Hongze Cheng 已提交
191 192
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
193

H
Hongze Cheng 已提交
194 195 196 197
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
198 199

  if (pDelIdx) {
H
Hongze Cheng 已提交
200 201 202 203
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
H
Hongze Cheng 已提交
204
    if (code) goto _err;
205 206
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
207 208
  }

H
Hongze Cheng 已提交
209
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
210

H
Hongze Cheng 已提交
211
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
212 213

  // memory
H
Hongze Cheng 已提交
214 215
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
216 217 218 219
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
220 221 222
  }

  // write
H
Hongze Cheng 已提交
223
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
H
Hongze Cheng 已提交
224 225 226
  if (code) goto _err;

  // put delIdx
227
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
228 229 230
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
231 232 233 234 235 236 237 238 239

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
240 241
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
242
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
H
Hongze Cheng 已提交
245
  if (code) goto _err;
H
Hongze Cheng 已提交
246

H
Hongze Cheng 已提交
247 248 249
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
250
  code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
251
  if (code) goto _err;
H
Hongze Cheng 已提交
252

H
Hongze Cheng 已提交
253
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
254
  if (code) goto _err;
H
Hongze Cheng 已提交
255 256

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
257
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
258 259 260
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
261 262 263 264
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
265 266 267
  return code;

_err:
H
Hongze Cheng 已提交
268
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
269 270 271
  return code;
}

H
Hongze Cheng 已提交
272 273 274 275 276
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
  SDFileSet  wSet;
H
Hongze Cheng 已提交
277

H
Hongze Cheng 已提交
278 279
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
280

H
Hongze Cheng 已提交
281
  // old
H
Hongze Cheng 已提交
282
  taosArrayClear(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
283 284
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockDataReset(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
285
  pRSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, pCommitter->commitFid, TD_EQ);
H
Hongze Cheng 已提交
286 287
  if (pRSet) {
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
288 289
    if (code) goto _err;

H
Hongze Cheng 已提交
290
    code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
H
Hongze Cheng 已提交
291
    if (code) goto _err;
H
Hongze Cheng 已提交
292
  }
H
Hongze Cheng 已提交
293

H
Hongze Cheng 已提交
294
  // new
H
Hongze Cheng 已提交
295
  taosArrayClear(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
296
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
297
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
298 299 300 301 302 303 304 305
  if (pRSet) {
    wSet = (SDFileSet){.diskId = pRSet->diskId,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = pRSet->fData,
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = pRSet->fSma};
  } else {
H
Hongze Cheng 已提交
306
    wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
H
Hongze Cheng 已提交
307 308 309 310 311
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = {.commitID = pCommitter->commitID, .size = 0},
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = {.commitID = pCommitter->commitID, .size = 0}};
H
Hongze Cheng 已提交
312
  }
H
Hongze Cheng 已提交
313 314
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
  if (code) goto _err;
H
Hongze Cheng 已提交
315

H
Hongze Cheng 已提交
316
_exit:
H
Hongze Cheng 已提交
317 318 319
  return code;

_err:
H
Hongze Cheng 已提交
320
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
321
  return code;
H
Hongze Cheng 已提交
322 323
}

H
Hongze Cheng 已提交
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmTable.pTSchema) {
    if (pCommitter->skmTable.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmTable.suid = suid;
  pCommitter->skmTable.uid = uid;
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
340 341
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
  if (code) goto _exit;
H
Hongze Cheng 已提交
342 343 344 345 346 347

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
H
Hongze Cheng 已提交
348 349
  int32_t code = 0;

H
Hongze Cheng 已提交
350 351
  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
H
Hongze Cheng 已提交
352
      if (suid == 0) {
H
Hongze Cheng 已提交
353
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
354
      } else {
H
Hongze Cheng 已提交
355
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
356 357 358 359
      }
    }
  }

H
Hongze Cheng 已提交
360 361 362
  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
363 364 365
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
  if (code) {
    goto _exit;
H
Hongze Cheng 已提交
366 367 368 369 370 371
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
372 373 374 375
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
                                   int8_t toDataOnly) {
  int32_t code = 0;

H
Hongze Cheng 已提交
376 377 378 379 380 381
  if (pBlock->nSubBlock == 0) {
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }
H
Hongze Cheng 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395
  }

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
396 397
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
398
  int32_t     code = 0;
H
Hongze Cheng 已提交
399 400 401
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
402 403
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
404
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
405 406
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
H
Hongze Cheng 已提交
407 408

  // read SBlockData
H
Hongze Cheng 已提交
409
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
410 411
  if (code) goto _err;

H
Hongze Cheng 已提交
412 413 414
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
415 416
  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
417
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
418
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
419
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
H
Hongze Cheng 已提交
420
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
421 422
  if (code) goto _err;

H
Hongze Cheng 已提交
423
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
424
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
425 426
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
427
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
428 429 430 431 432 433 434
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
435 436 437 438 439
      int32_t c = tsdbRowCmprFn(pRow1, pRow2);
      if (c < 0) {
        goto _append_mem_row;
      } else if (c > 0) {
        goto _append_block_row;
H
Hongze Cheng 已提交
440 441 442 443
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
444
      goto _append_mem_row;
H
Hongze Cheng 已提交
445
    } else {
H
Hongze Cheng 已提交
446 447 448 449
      goto _append_block_row;
    }

  _append_mem_row:
H
Hongze Cheng 已提交
450
    code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
451
    if (code) goto _err;
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453 454 455 456
    tsdbTbDataIterNext(pIter);
    pRow1 = tsdbTbDataIterGet(pIter);
    if (pRow1) {
      if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
H
Hongze Cheng 已提交
457
        code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
458
        if (code) goto _err;
H
Hongze Cheng 已提交
459
      } else {
H
Hongze Cheng 已提交
460
        pRow1 = NULL;
H
Hongze Cheng 已提交
461
      }
H
Hongze Cheng 已提交
462 463
    }

H
Hongze Cheng 已提交
464 465 466 467 468 469 470
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }

  _append_block_row:
H
Hongze Cheng 已提交
471
    code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
H
Hongze Cheng 已提交
472 473
    if (code) goto _err;

H
Hongze Cheng 已提交
474 475
    if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
      *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
H
Hongze Cheng 已提交
476
    } else {
H
Hongze Cheng 已提交
477
      pRow2 = NULL;
H
Hongze Cheng 已提交
478 479
    }

H
Hongze Cheng 已提交
480 481 482 483 484
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }
H
Hongze Cheng 已提交
485

H
Hongze Cheng 已提交
486
  _write_block:
H
Hongze Cheng 已提交
487
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
H
Hongze Cheng 已提交
488 489 490
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
491
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
492 493 494 495 496 497 498 499 500
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
501
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
502 503
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
504 505
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
506
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
507 508
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
509

H
Hongze Cheng 已提交
510 511 512
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
513
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
514
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
515
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
516
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
517
  while (true) {
H
Hongze Cheng 已提交
518
    if (pRow == NULL) {
H
Hongze Cheng 已提交
519 520 521 522 523 524 525
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
526
    // update schema
H
Hongze Cheng 已提交
527
    code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
528 529
    if (code) goto _err;

H
Hongze Cheng 已提交
530
    // append
H
Hongze Cheng 已提交
531
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
532 533 534 535
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
536 537 538
    // if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
    // crash on CI, use the block following
    if (pRow) {
539 540
      TSDBKEY tmpKey = TSDBROW_KEY(pRow);
      if (tsdbKeyCmprFn(&tmpKey, &toKey) >= 0) {
541 542 543
        pRow = NULL;
      }
    }
H
Hongze Cheng 已提交
544 545

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
546
    continue;
H
Hongze Cheng 已提交
547 548

  _write_block:
H
Hongze Cheng 已提交
549
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
H
Hongze Cheng 已提交
550 551 552
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
553
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
554 555 556 557 558
  }

  return code;

_err:
H
Hongze Cheng 已提交
559
  tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
560 561 562
  return code;
}

H
Hongze Cheng 已提交
563
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
564
  int32_t code = 0;
H
Hongze Cheng 已提交
565
  SBlock  block;
H
Hongze Cheng 已提交
566 567

  if (pBlock->last) {
H
Hongze Cheng 已提交
568
    code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
H
Hongze Cheng 已提交
569 570
    if (code) goto _err;

H
Hongze Cheng 已提交
571 572
    tBlockReset(&block);
    code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
573 574 575 576 577 578 579 580 581 582
    if (code) goto _err;
  } else {
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
583 584 585
  return code;
}

H
Hongze Cheng 已提交
586 587
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
588 589
  SBlockIdx  blockIdx = {.suid = suid, .uid = uid};
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
590 591 592 593

  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  if (code) goto _err;

H
Hongze Cheng 已提交
594 595 596 597
  if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
598 599 600 601 602 603 604 605

  return code;

_err:
  tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
606 607 608 609 610 611 612 613 614
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
H
Hongze Cheng 已提交
615
    pRow = tsdbTbDataIterGet(&iter);
H
Hongze Cheng 已提交
616 617 618 619 620 621 622

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
H
Hongze Cheng 已提交
623
      tsdbTbDataIterNext(&iter);
H
Hongze Cheng 已提交
624 625 626 627 628 629 630 631 632 633 634 635 636 637
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
H
Hongze Cheng 已提交
638
  SBlock      block;
H
Hongze Cheng 已提交
639 640
  TSDBROW    *pRow;

H
Hongze Cheng 已提交
641 642 643
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
644
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
645
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
646 647
  if (code) goto _err;
  while (true) {
H
Hongze Cheng 已提交
648
    if (pRow == NULL) break;
H
Hongze Cheng 已提交
649
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
650 651 652 653 654
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
H
Hongze Cheng 已提交
655 656
      TSDBKEY key = TSDBROW_KEY(pRow);
      int32_t c = tBlockCmprFn(&(SBlock){.minKey = key, .maxKey = key}, pBlock);
H
Hongze Cheng 已提交
657 658

      if (c == 0) {
H
Hongze Cheng 已提交
659 660
        code =
            tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
661 662 663 664 665 666 667 668 669
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

H
Hongze Cheng 已提交
670 671
  block = *pBlock;
  code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
672 673 674 675 676 677 678 679 680
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
681 682
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
683 684
  STbDataIter  iter = {0};
  STbDataIter *pIter = &iter;
H
Hongze Cheng 已提交
685
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
686
  int32_t      iBlock;
H
Hongze Cheng 已提交
687
  int32_t      nBlock;
H
Hongze Cheng 已提交
688 689
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
690 691 692 693

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
694 695
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
696 697
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
698 699 700 701 702 703 704 705 706 707 708
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
    if (code) goto _err;

    nBlock = pCommitter->oBlockMap.nItem;
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
709

H
Hongze Cheng 已提交
710 711
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
712 713 714 715
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
716
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
717 718

  // start ===========
H
Hongze Cheng 已提交
719
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
720 721
  SBlock  block;
  SBlock *pBlock = &block;
H
Hongze Cheng 已提交
722

H
Hongze Cheng 已提交
723
  iBlock = 0;
H
Hongze Cheng 已提交
724 725 726 727 728 729
  if (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
730 731 732 733 734
  if (pRow) {
    code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
735 736
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
737
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
738

H
Hongze Cheng 已提交
739
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
740
      if (pBlock->last) {
H
Hongze Cheng 已提交
741
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
742
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
743 744 745
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
746
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
747
        iBlock++;
H
Hongze Cheng 已提交
748 749 750 751 752
        if (iBlock < nBlock) {
          tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
753 754

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
755
      } else {
H
Hongze Cheng 已提交
756
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
757
        if (c > 0) {
H
Hongze Cheng 已提交
758
          // only disk data
H
Hongze Cheng 已提交
759
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
760 761
          if (code) goto _err;

H
Hongze Cheng 已提交
762 763 764 765 766 767
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
768
        } else if (c < 0) {
H
Hongze Cheng 已提交
769
          // only memory data
H
Hongze Cheng 已提交
770
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
771 772 773
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
774
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
775
        } else {
H
Hongze Cheng 已提交
776
          // merge memory and disk
H
Hongze Cheng 已提交
777 778
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
779
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
780 781
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
782
          } else {
H
Hongze Cheng 已提交
783 784 785 786 787 788 789 790 791 792
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
              tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
793
            }
H
Hongze Cheng 已提交
794 795 796

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
797
          }
H
Hongze Cheng 已提交
798 799 800 801 802 803 804 805 806

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
807 808 809
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
810
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
811 812
      if (code) goto _err;

H
Hongze Cheng 已提交
813 814 815 816 817 818
      iBlock++;
      if (iBlock < nBlock) {
        tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
819
    } else {
H
Hongze Cheng 已提交
820 821
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
822 823 824
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
825 826
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
827 828 829
    }
  }

H
Hongze Cheng 已提交
830 831
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
832 833 834 835 836 837 838 839 840 841 842 843 844 845
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
846 847 848 849 850 851 852 853 854 855 856 857
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

  // write blockIdx
  code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
  if (code) goto _err;

  // update file header
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
  if (code) goto _err;

  // upsert SDFileSet
H
Hongze Cheng 已提交
858
  code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
H
Hongze Cheng 已提交
859 860 861 862 863 864 865 866
  if (code) goto _err;

  // close and sync
  code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
  if (code) goto _err;

  if (pCommitter->pReader) {
    code = tsdbDataFReaderClose(&pCommitter->pReader);
H
Hongze Cheng 已提交
867
    if (code) goto _err;
H
Hongze Cheng 已提交
868 869 870 871 872 873 874 875 876 877 878
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
879 880 881
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
882 883 884 885 886 887

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

  // commit file data impl
H
Hongze Cheng 已提交
888 889
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
890
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
891
  int32_t    nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
892
  STbData   *pTbData;
H
Hongze Cheng 已提交
893
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
894

H
Hongze Cheng 已提交
895
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
896

H
Hongze Cheng 已提交
897
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
H
Hongze Cheng 已提交
898
  pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
899 900 901
  while (pTbData || pBlockIdx) {
    if (pTbData && pBlockIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
H
Hongze Cheng 已提交
902

H
Hongze Cheng 已提交
903
      if (c == 0) {
H
Hongze Cheng 已提交
904
        goto _commit_table_mem_and_disk;
H
Hongze Cheng 已提交
905
      } else if (c < 0) {
H
Hongze Cheng 已提交
906
        goto _commit_table_mem_data;
H
Hongze Cheng 已提交
907
      } else {
H
Hongze Cheng 已提交
908
        goto _commit_table_disk_data;
H
Hongze Cheng 已提交
909
      }
H
Hongze Cheng 已提交
910
    } else if (pBlockIdx) {
H
Hongze Cheng 已提交
911 912 913 914
      goto _commit_table_disk_data;
    } else {
      goto _commit_table_mem_data;
    }
H
Hongze Cheng 已提交
915

H
Hongze Cheng 已提交
916 917 918 919 920
  _commit_table_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
921
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
922
    continue;
H
Hongze Cheng 已提交
923

H
Hongze Cheng 已提交
924 925 926 927 928
  _commit_table_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
929
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
930 931 932 933 934 935 936
    continue;

  _commit_table_mem_and_disk:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
937
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
938
    iTbData++;
H
Hongze Cheng 已提交
939
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
940
    continue;
H
Hongze Cheng 已提交
941 942
  }

H
Hongze Cheng 已提交
943 944
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
945
  if (code) goto _err;
H
Hongze Cheng 已提交
946 947 948 949

  return code;

_err:
H
Hongze Cheng 已提交
950
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
951 952
  tsdbDataFReaderClose(&pCommitter->pReader);
  tsdbDataFWriterClose(&pCommitter->pWriter, 0);
H
Hongze Cheng 已提交
953 954 955
  return code;
}

H
Hongze Cheng 已提交
956 957
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
958
  int32_t code = 0;
H
Hongze Cheng 已提交
959

H
Hongze Cheng 已提交
960 961
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
962

H
Hongze Cheng 已提交
963 964 965 966
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
967

H
Hongze Cheng 已提交
968
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
969
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
970 971 972 973
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
974
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
975

H
Hongze Cheng 已提交
976
  code = tsdbFSBegin(pTsdb->pFS);
H
Hongze Cheng 已提交
977 978 979 980 981 982
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
983 984 985
  return code;
}

H
Hongze Cheng 已提交
986 987 988
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
989
  pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
990 991 992 993 994
  if (pCommitter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
995
  pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
996 997 998 999 1000
  if (pCommitter->aBlockIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
1001 1002
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
H
Hongze Cheng 已提交
1003

H
Hongze Cheng 已提交
1004
  code = tBlockDataInit(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
1005
  if (code) goto _exit;
H
Hongze Cheng 已提交
1006 1007 1008 1009 1010 1011

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1012
  taosArrayDestroy(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
1013
  tMapDataClear(&pCommitter->oBlockMap);
H
Hongze Cheng 已提交
1014
  tBlockDataClear(&pCommitter->oBlockData, 1);
H
Hongze Cheng 已提交
1015
  taosArrayDestroy(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
1016
  tMapDataClear(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
1017
  tBlockDataClear(&pCommitter->nBlockData, 1);
H
Hongze Cheng 已提交
1018 1019
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
1020 1021
}

H
Hongze Cheng 已提交
1022 1023 1024 1025
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1026

H
Hongze Cheng 已提交
1027
  // check
H
Hongze Cheng 已提交
1028
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
1029

H
Hongze Cheng 已提交
1030 1031
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
1032
  if (code) goto _err;
H
Hongze Cheng 已提交
1033 1034 1035

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1036 1037 1038 1039 1040
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1041
    if (code) goto _err;
H
Hongze Cheng 已提交
1042
  }
H
Hongze Cheng 已提交
1043

H
Hongze Cheng 已提交
1044 1045 1046
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1047 1048 1049
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
1050

H
Hongze Cheng 已提交
1051
_err:
H
Hongze Cheng 已提交
1052
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
1053 1054 1055
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1056

H
Hongze Cheng 已提交
1057 1058 1059 1060
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1061

H
Hongze Cheng 已提交
1062 1063
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1064
  }
H
Hongze Cheng 已提交
1065

H
Hongze Cheng 已提交
1066 1067 1068 1069 1070
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1071

H
Hongze Cheng 已提交
1072
  // impl
H
Hongze Cheng 已提交
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
  int32_t  nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1128
  }
H
Hongze Cheng 已提交
1129

H
Hongze Cheng 已提交
1130 1131 1132 1133 1134
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1135

H
Hongze Cheng 已提交
1136
_exit:
H
Hongze Cheng 已提交
1137
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1138 1139 1140
  return code;

_err:
H
Hongze Cheng 已提交
1141
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1142
  return code;
H
Hongze Cheng 已提交
1143 1144 1145 1146 1147 1148 1149
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
1150 1151

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1152 1153 1154 1155 1156
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

  if (eno == 0) {
H
Hongze Cheng 已提交
1157
    code = tsdbFSCommit(pTsdb->pFS);
H
Hongze Cheng 已提交
1158
  } else {
H
Hongze Cheng 已提交
1159
    code = tsdbFSRollback(pTsdb->pFS);
H
Hongze Cheng 已提交
1160 1161
  }

H
Hongze Cheng 已提交
1162
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1163
  pTsdb->imem = NULL;
H
Hongze Cheng 已提交
1164 1165 1166
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
1167 1168 1169 1170 1171 1172

  tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1173 1174
  return code;
}