tsdbCommit.c 11.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "dev.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19 20 21 22 23 24 25 26 27 28 29
typedef struct {
  STsdb *pTsdb;
  // config
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
  int8_t  sttTrigger;
  SArray *aTbDataP;
  // context
H
Hongze Cheng 已提交
30 31 32 33 34 35
  TSKEY   nextKey;
  int32_t fid;
  int32_t expLevel;
  TSKEY   minKey;
  TSKEY   maxKey;
  // writer
H
Hongze Cheng 已提交
36
  SArray             *aFileOp;
H
Hongze Cheng 已提交
37
  struct SSttFWriter *pWriter;
H
Hongze Cheng 已提交
38
} SCommitter;
H
Hongze Cheng 已提交
39

H
Hongze Cheng 已提交
40
static int32_t open_committer_writer(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
41 42 43
  int32_t code;
  int32_t lino;

H
Hongze Cheng 已提交
44 45 46 47 48 49 50 51 52
  struct SSttFWriterConf conf = {
      .pTsdb = pCommitter->pTsdb,
      .pSkmTb = NULL,
      .pSkmRow = NULL,
      .maxRow = pCommitter->maxRow,
      .szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize,
      .cmprAlg = pCommitter->cmprAlg,
      .aBuf = NULL,
  };
H
Hongze Cheng 已提交
53

H
Hongze Cheng 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
  // pCommitter->pTsdb->pFS = NULL;
  // taosbsearch(pCommitter->pTsdb->pFS->aFileSet, &pCommitter->fid, tsdbCompareFid, &lino);
  struct SFileSet *pSet = NULL;
  if (pSet == NULL) {
    conf.file = (struct STFile){
        .cid = 1,
        .fid = pCommitter->fid,
        .diskId = (SDiskID){0},
        .type = TSDB_FTYPE_STT,
    };
    tsdbTFileInit(pCommitter->pTsdb, &conf.file);
  } else {
    // TODO
    ASSERT(0);
  }
H
Hongze Cheng 已提交
69 70

  code = tsdbSttFWriterOpen(&conf, &pCommitter->pWriter);
H
Hongze Cheng 已提交
71 72 73 74 75 76 77
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
              tstrerror(code), pCommitter->fid);
  }
H
Hongze Cheng 已提交
78 79 80
  return code;
}

H
Hongze Cheng 已提交
81
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) {
H
Hongze Cheng 已提交
82 83 84 85
  int32_t code = 0;
  int32_t lino;

  if (pCommitter->pWriter == NULL) {
H
Hongze Cheng 已提交
86
    code = open_committer_writer(pCommitter);
H
Hongze Cheng 已提交
87 88 89
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
90
  code = tsdbSttFWriteTSData(pCommitter->pWriter, tbid, pRow);
H
Hongze Cheng 已提交
91 92 93 94 95 96 97
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
H
Hongze Cheng 已提交
98
              TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts,
H
Hongze Cheng 已提交
99 100
              TSDBROW_KEY(pRow).version);
  }
H
Hongze Cheng 已提交
101
  return 0;
H
Hongze Cheng 已提交
102 103
}

H
Hongze Cheng 已提交
104 105 106 107 108 109 110
static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
                                      int64_t eKey) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
111
static int32_t commit_timeseries_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
112 113 114
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
115
  int64_t    nRow = 0;
H
Hongze Cheng 已提交
116 117
  SMemTable *pMem = pCommitter->pTsdb->imem;

H
Hongze Cheng 已提交
118 119 120
  if (pMem->nRow == 0) {  // no time-series data to commit
    goto _exit;
  }
H
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122
  TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
123 124
  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbDataIter iter;
H
Hongze Cheng 已提交
125 126
    STbData    *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

H
Hongze Cheng 已提交
127 128 129 130 131 132
    tsdbTbDataIterOpen(pTbData, &from, 0, &iter);

    for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) {
      TSDBKEY rowKey = TSDBROW_KEY(pRow);

      if (rowKey.ts > pCommitter->maxKey) {
H
Hongze Cheng 已提交
133
        pCommitter->nextKey = TMIN(pCommitter->nextKey, rowKey.ts);
H
Hongze Cheng 已提交
134 135 136
        break;
      }

H
Hongze Cheng 已提交
137
      nRow++;
H
Hongze Cheng 已提交
138

H
Hongze Cheng 已提交
139
      code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow);
H
Hongze Cheng 已提交
140 141 142 143 144 145 146 147 148
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
H
Hongze Cheng 已提交
149
              nRow);
H
Hongze Cheng 已提交
150 151 152 153
  }
  return code;
}

H
Hongze Cheng 已提交
154
static int32_t commit_delete_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
155 156 157
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
158 159
  ASSERTS(0, "not implemented yet");

H
Hongze Cheng 已提交
160
  int64_t    nDel = 0;
H
Hongze Cheng 已提交
161 162
  SMemTable *pMem = pCommitter->pTsdb->imem;

H
Hongze Cheng 已提交
163 164 165
  if (pMem->nDel == 0) {  // no del data
    goto _exit;
  }
H
Hongze Cheng 已提交
166 167 168 169 170

  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

    for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
171 172 173 174 175
      if (pDelData->eKey < pCommitter->minKey) continue;
      if (pDelData->sKey > pCommitter->maxKey) {
        pCommitter->nextKey = TMIN(pCommitter->nextKey, pDelData->sKey);
        continue;
      }
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177 178
      code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version,
                                    pDelData->sKey /* TODO */, pDelData->eKey /* TODO */);
H
Hongze Cheng 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
              pMem->nDel);
  }
  return code;
}

H
Hongze Cheng 已提交
193
static int32_t start_commit_file_set(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
194 195 196 197
  pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
  pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
H
Hongze Cheng 已提交
198
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200 201 202 203
  tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d",
            TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, pCommitter->minKey, pCommitter->maxKey,
            pCommitter->expLevel);
  return 0;
H
Hongze Cheng 已提交
204 205
}

H
Hongze Cheng 已提交
206
static int32_t end_commit_file_set(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
207 208 209 210 211 212 213 214 215 216 217 218
  int32_t code = 0;
  int32_t lino = 0;

  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
219
static int32_t commit_next_file_set(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
220 221 222
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
223
  // fset commit start
H
Hongze Cheng 已提交
224
  code = start_commit_file_set(pCommitter);
H
Hongze Cheng 已提交
225
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
226

H
Hongze Cheng 已提交
227
  // commit fset
H
Hongze Cheng 已提交
228
  code = commit_timeseries_data(pCommitter);
H
Hongze Cheng 已提交
229 230
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
231
  code = commit_delete_data(pCommitter);
H
Hongze Cheng 已提交
232 233 234
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
235
  code = end_commit_file_set(pCommitter);
H
Hongze Cheng 已提交
236
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
237 238 239

_exit:
  if (code) {
H
Hongze Cheng 已提交
240 241
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
242 243 244 245
  }
  return code;
}

H
Hongze Cheng 已提交
246
static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
247
  int32_t code = 0;
H
Hongze Cheng 已提交
248
  int32_t lino;
H
Hongze Cheng 已提交
249

H
Hongze Cheng 已提交
250
  // set config
H
Hongze Cheng 已提交
251 252
  memset(pCommitter, 0, sizeof(SCommitter));
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
253 254 255 256 257 258 259 260 261 262 263 264
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
  pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
  pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
  pCommitter->sttTrigger = 0;  // TODO

  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
265 266 267 268 269
  pCommitter->aFileOp = taosArrayInit(10, sizeof(struct SFileOp));
  if (pCommitter->aFileOp == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
270

H
Hongze Cheng 已提交
271 272
  // start loop
  pCommitter->nextKey = pTsdb->imem->minKey;  // TODO
H
Hongze Cheng 已提交
273 274 275

_exit:
  if (code) {
H
Hongze Cheng 已提交
276
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
277
  } else {
H
Hongze Cheng 已提交
278
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
279 280 281 282
  }
  return code;
}

H
Hongze Cheng 已提交
283
static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
H
Hongze Cheng 已提交
284
  int32_t code = 0;
H
Hongze Cheng 已提交
285 286 287 288 289 290
  int32_t lino;

  // code = tsdbFSBegin(pCommiter->pTsdb, pCommiter->aFileOp);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
H
Hongze Cheng 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
  return code;
}

int32_t tsdbPreCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  ASSERT(pTsdb->imem == NULL);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  return 0;
}

int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
  if (!pTsdb) return 0;

  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
308
  SMemTable *pMem = pTsdb->imem;
H
Hongze Cheng 已提交
309

H
Hongze Cheng 已提交
310
  if (pMem->nRow == 0 && pMem->nDel == 0) {
H
Hongze Cheng 已提交
311 312 313
    taosThreadRwlockWrlock(&pTsdb->rwLock);
    pTsdb->imem = NULL;
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
314 315 316
    tsdbUnrefMemTable(pMem, NULL, true);
  } else {
    SCommitter committer;
H
Hongze Cheng 已提交
317

H
Hongze Cheng 已提交
318
    code = open_committer(pTsdb, pInfo, &committer);
H
Hongze Cheng 已提交
319
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
320

H
Hongze Cheng 已提交
321
    while (committer.nextKey != TSKEY_MAX) {
H
Hongze Cheng 已提交
322
      code = commit_next_file_set(&committer);
H
Hongze Cheng 已提交
323
      if (code) break;
H
Hongze Cheng 已提交
324
    }
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326
    code = close_committer(&committer, code);
H
Hongze Cheng 已提交
327 328
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
329 330 331

_exit:
  if (code) {
H
Hongze Cheng 已提交
332 333 334 335
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow,
             pMem->nDel);
H
Hongze Cheng 已提交
336 337 338 339
  }
  return code;
}

H
Hongze Cheng 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
// int32_t tsdbCommitCommit(STsdb *pTsdb) {
//   int32_t    code = 0;
//   int32_t    lino = 0;
//   SMemTable *pMemTable = pTsdb->imem;

//   // lock
//   taosThreadRwlockWrlock(&pTsdb->rwLock);

//   code = tsdbFSCommit(pTsdb);
//   if (code) {
//     taosThreadRwlockUnlock(&pTsdb->rwLock);
//     TSDB_CHECK_CODE(code, lino, _exit);
//   }

//   pTsdb->imem = NULL;

//   // unlock
//   taosThreadRwlockUnlock(&pTsdb->rwLock);
//   if (pMemTable) {
//     tsdbUnrefMemTable(pMemTable, NULL, true);
//   }

// _exit:
//   if (code) {
//     tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
//   } else {
//     tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
//   }
//   return code;
// }

// int32_t tsdbCommitAbort(STsdb *pTsdb) {
//   int32_t code = 0;
//   int32_t lino = 0;

//   code = tsdbFSRollback(pTsdb);
//   TSDB_CHECK_CODE(code, lino, _exit);

// _exit:
//   if (code) {
//     tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
//   } else {
//     tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
//   }
//   return code;
// }