tdbPager.c 28.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tdbInt.h"
17
/*
H
Hongze Cheng 已提交
18
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21 22 23 24
  u8    hdrString[16];
  u16   pageSize;
  SPgno freePage;
  u32   nFreePages;
  u8    reserved[102];
H
Hongze Cheng 已提交
25
} SFileHdr;
wafwerar's avatar
wafwerar 已提交
26
#pragma pack(pop)
H
Hongze Cheng 已提交
27

H
Hongze Cheng 已提交
28
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
29
*/
30
struct hashset_st {
M
Minglei Jin 已提交
31 32 33
  size_t  nbits;
  size_t  mask;
  size_t  capacity;
34
  size_t *items;
M
Minglei Jin 已提交
35 36
  size_t  nitems;
  double  load_factor;
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
};

static const unsigned int prime = 39;
static const unsigned int prime2 = 5009;

hashset_t hashset_create(void) {
  hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st));
  if (!set) {
    return NULL;
  }

  set->nbits = 4;
  set->capacity = (size_t)(1 << set->nbits);
  set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
  if (!set->items) {
    tdbOsFree(set);
    return NULL;
  }
  set->mask = set->capacity - 1;
  set->nitems = 0;

  set->load_factor = 0.75;

  return set;
}

void hashset_destroy(hashset_t set) {
  if (set) {
    tdbOsFree(set->items);
    tdbOsFree(set);
  }
}

int hashset_add_member(hashset_t set, void *item) {
M
Minglei Jin 已提交
71
  size_t value = (size_t)item;
72 73 74
  size_t h;

  if (value == 0) {
M
Minglei Jin 已提交
75
    return -1;
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  }

  for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      return 0;
    }
  }

  set->items[h] = value;
  ++set->nitems;
  return 1;
}

int hashset_add(hashset_t set, void *item) {
  int ret = hashset_add_member(set, item);

  size_t old_capacity = set->capacity;
  if (set->nitems >= (double)old_capacity * set->load_factor) {
    size_t *old_items = set->items;
    ++set->nbits;
    set->capacity = (size_t)(1 << set->nbits);
    set->mask = set->capacity - 1;

    set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
    if (!set->items) {
      return -1;
    }

    set->nitems = 0;
    for (size_t i = 0; i < old_capacity; ++i) {
M
Minglei Jin 已提交
106
      hashset_add_member(set, (void *)old_items[i]);
107 108 109 110 111 112 113 114
    }
    tdbOsFree(old_items);
  }

  return ret;
}

int hashset_remove(hashset_t set, void *item) {
M
Minglei Jin 已提交
115
  size_t value = (size_t)item;
116 117 118 119 120 121 122 123 124 125 126 127 128

  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      set->items[h] = 0;
      --set->nitems;
      return 1;
    }
  }

  return 0;
}

int hashset_contains(hashset_t set, void *item) {
M
Minglei Jin 已提交
129
  size_t value = (size_t)item;
130 131 132 133 134 135 136 137 138 139

  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      return 1;
    }
  }

  return 0;
}

H
Hongze Cheng 已提交
140 141
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)

H
Hongze Cheng 已提交
142 143
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage);
H
Hongze Cheng 已提交
144
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
145
static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147 148 149
static FORCE_INLINE int32_t pageCmpFn(const SRBTreeNode *lhs, const SRBTreeNode *rhs) {
  SPage *pPageL = (SPage *)(((uint8_t *)lhs) - offsetof(SPage, node));
  SPage *pPageR = (SPage *)(((uint8_t *)rhs) - offsetof(SPage, node));
150 151 152 153 154 155 156 157 158 159 160 161 162

  SPgno pgnoL = TDB_PAGE_PGNO(pPageL);
  SPgno pgnoR = TDB_PAGE_PGNO(pPageR);

  if (pgnoL < pgnoR) {
    return -1;
  } else if (pgnoL > pgnoR) {
    return 1;
  } else {
    return 0;
  }
}

H
refact  
Hongze Cheng 已提交
163
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
H
more  
Hongze Cheng 已提交
164
  uint8_t *pPtr;
H
Hongze Cheng 已提交
165
  SPager  *pPager;
H
more  
Hongze Cheng 已提交
166 167
  int      fsize;
  int      zsize;
H
Hongze Cheng 已提交
168
  int      ret;
H
more  
Hongze Cheng 已提交
169

H
refact  
Hongze Cheng 已提交
170
  *ppPager = NULL;
H
more  
Hongze Cheng 已提交
171 172

  fsize = strlen(fileName);
H
refact  
Hongze Cheng 已提交
173
  zsize = sizeof(*pPager)  /* SPager */
H
more  
Hongze Cheng 已提交
174 175
          + fsize + 1      /* dbFileName */
          + fsize + 8 + 1; /* jFileName */
H
Hongze Cheng 已提交
176
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
H
more  
Hongze Cheng 已提交
177 178 179 180
  if (pPtr == NULL) {
    return -1;
  }

H
refact  
Hongze Cheng 已提交
181 182 183 184 185 186
  pPager = (SPager *)pPtr;
  pPtr += sizeof(*pPager);
  // pPager->dbFileName
  pPager->dbFileName = (char *)pPtr;
  memcpy(pPager->dbFileName, fileName, fsize);
  pPager->dbFileName[fsize] = '\0';
H
more  
Hongze Cheng 已提交
187
  pPtr += fsize + 1;
H
refact  
Hongze Cheng 已提交
188 189 190 191 192 193 194 195
  // pPager->jFileName
  pPager->jFileName = (char *)pPtr;
  memcpy(pPager->jFileName, fileName, fsize);
  memcpy(pPager->jFileName + fsize, "-journal", 8);
  pPager->jFileName[fsize + 8] = '\0';
  // pPager->pCache
  pPager->pCache = pCache;

H
Hongze Cheng 已提交
196
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
M
Minglei Jin 已提交
197 198
  if (TDB_FD_INVALID(pPager->fd)) {
    // if (pPager->fd < 0) {
H
more  
Hongze Cheng 已提交
199 200 201
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
202
  ret = tdbGnrtFileID(pPager->fd, pPager->fid, false);
H
Hongze Cheng 已提交
203 204 205 206
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
207
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
208
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
H
Hongze Cheng 已提交
209 210
  // pPager->dbOrigSize
  ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
H
Hongze Cheng 已提交
211
  pPager->dbFileSize = pPager->dbOrigSize;
H
more  
Hongze Cheng 已提交
212

213
  tdbTrace("pager/open reset dirty tree: %p", &pPager->rbt);
214 215
  tRBTreeCreate(&pPager->rbt, pageCmpFn);

H
refact  
Hongze Cheng 已提交
216
  *ppPager = pPager;
H
more  
Hongze Cheng 已提交
217 218 219
  return 0;
}

H
refact  
Hongze Cheng 已提交
220
int tdbPagerClose(SPager *pPager) {
H
Hongze Cheng 已提交
221
  if (pPager) {
M
Minglei Jin 已提交
222
    /*
H
Hongze Cheng 已提交
223 224 225
    if (pPager->inTran) {
      tdbOsClose(pPager->jfd);
    }
M
Minglei Jin 已提交
226
    */
H
Hongze Cheng 已提交
227 228 229
    tdbOsClose(pPager->fd);
    tdbOsFree(pPager);
  }
H
more  
Hongze Cheng 已提交
230 231
  return 0;
}
H
Hongze Cheng 已提交
232

H
refact  
Hongze Cheng 已提交
233
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
H
Hongze Cheng 已提交
234 235
  int     ret;
  SPage **ppPage;
H
more  
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237
  if (pPage->isDirty) return 0;
H
more  
Hongze Cheng 已提交
238

H
Hongze Cheng 已提交
239
  // ref page one more time so the page will not be release
H
Hongze Cheng 已提交
240
  tdbRefPage(pPage);
241
  tdbTrace("pager/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
242

H
Hongze Cheng 已提交
243 244
  // Set page as dirty
  pPage->isDirty = 1;
245

246
  tdbTrace("tdb/pager-write: put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
247
  tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
H
Hongze Cheng 已提交
248

H
Hongze Cheng 已提交
249
  // Write page to journal if neccessary
M
Minglei Jin 已提交
250
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize &&
M
Minglei Jin 已提交
251 252
      (pPager->pActiveTxn->jPageSet == NULL ||
       !hashset_contains(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) {
H
Hongze Cheng 已提交
253 254
    ret = tdbPagerWritePageToJournal(pPager, pPage);
    if (ret < 0) {
255
      tdbError("failed to write page to journal since %s", tstrerror(terrno));
H
Hongze Cheng 已提交
256
      return -1;
H
Hongze Cheng 已提交
257
    }
258

M
Minglei Jin 已提交
259 260
    if (pPager->pActiveTxn->jPageSet) {
      hashset_add(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
261
    }
H
more  
Hongze Cheng 已提交
262
  }
H
Hongze Cheng 已提交
263

H
Hongze Cheng 已提交
264 265 266
  return 0;
}

H
Hongze Cheng 已提交
267
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
M
Minglei Jin 已提交
268
  /*
H
refact  
Hongze Cheng 已提交
269
  if (pPager->inTran) {
H
more  
Hongze Cheng 已提交
270 271
    return 0;
  }
M
Minglei Jin 已提交
272
  */
H
Hongze Cheng 已提交
273
  // Open the journal
M
Minglei Jin 已提交
274 275 276 277
  char jTxnFileName[TDB_FILENAME_LEN];
  sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
  pTxn->jfd = tdbOsOpen(jTxnFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
  if (TDB_FD_INVALID(pTxn->jfd)) {
278 279
    tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
280 281 282
    return -1;
  }

M
Minglei Jin 已提交
283
  pTxn->jPageSet = hashset_create();
M
Minglei Jin 已提交
284

M
Minglei Jin 已提交
285
  pPager->pActiveTxn = pTxn;
286 287 288

  tdbDebug("pager/begin: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);

H
Hongze Cheng 已提交
289
  // TODO: write the size of the file
M
Minglei Jin 已提交
290
  /*
H
refact  
Hongze Cheng 已提交
291
  pPager->inTran = 1;
M
Minglei Jin 已提交
292
  */
H
more  
Hongze Cheng 已提交
293 294 295
  return 0;
}

H
Hongze Cheng 已提交
296
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
H
Hongze Cheng 已提交
297 298 299
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
300
  // sync the journal file
M
Minglei Jin 已提交
301
  ret = tdbOsFSync(pTxn->jfd);
H
Hongze Cheng 已提交
302
  if (ret < 0) {
M
Minglei Jin 已提交
303
    tdbError("failed to fsync: %s. jFileName:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
304 305
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
H
Hongze Cheng 已提交
306 307
  }

308
  // loop to write the dirty pages to file
309 310 311 312
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
313

314 315 316 317 318
    if (pPage->nOverflow != 0) {
      tdbError("tdb/pager-commit: %p, pPage: %p, ovfl: %d, commit page failed.", pPager, pPage, pPage->nOverflow);
      return -1;
    }

319 320 321 322
    if (!TDB_PAGE_TOTAL_CELLS(pPage) && TDB_PAGE_PGNO(pPage) > 1) {
      tdbDebug("pager/commit: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
    }

323
    ret = tdbPagerPWritePageToDB(pPager, pPage);
324
    if (ret < 0) {
325
      tdbError("failed to write page to db since %s", tstrerror(terrno));
326 327 328 329
      return -1;
    }
  }

330 331
  tdbDebug("pager/commit: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);

332
  pPager->dbOrigSize = pPager->dbFileSize;
H
Hongze Cheng 已提交
333

H
Hongze Cheng 已提交
334
  // release the page
335 336 337
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
H
Hongze Cheng 已提交
338 339 340

    pPage->isDirty = 0;

341
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
M
Minglei Jin 已提交
342 343
    if (pTxn->jPageSet) {
      hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
344
    }
H
Hongze Cheng 已提交
345
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
H
Hongze Cheng 已提交
346
  }
347

348
  tdbTrace("pager/commit reset dirty tree: %p", &pPager->rbt);
349
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
H
Hongze Cheng 已提交
350 351

  // sync the db file
352 353 354 355 356
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
H
Hongze Cheng 已提交
357

358 359 360 361
  return 0;
}

int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
M
Minglei Jin 已提交
362 363 364
  char jTxnFileName[TDB_FILENAME_LEN];
  sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);

365
  // remove the journal file
M
Minglei Jin 已提交
366 367
  if (tdbOsClose(pTxn->jfd) < 0) {
    tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
368 369 370 371
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

M
Minglei Jin 已提交
372 373
  if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) {
    tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName);
374 375 376 377
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

M
Minglei Jin 已提交
378
  // pPager->inTran = 0;
379

380 381
  tdbDebug("pager/post-commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);

382 383 384
  return 0;
}

385 386
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
M
Minglei Jin 已提交
387
  SPgno  maxPgno = pPager->dbOrigSize;
388 389 390
  int    ret;

  // sync the journal file
M
Minglei Jin 已提交
391
  ret = tdbOsFSync(pTxn->jfd);
392
  if (ret < 0) {
M
Minglei Jin 已提交
393
    tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
394 395 396 397
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

398 399 400 401 402 403
  // loop to write the dirty pages to file
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
    if (pPage->isLocal) continue;
M
Minglei Jin 已提交
404 405 406 407 408

    SPgno pgno = TDB_PAGE_PGNO(pPage);
    if (pgno > maxPgno) {
      maxPgno = pgno;
    }
409
    ret = tdbPagerPWritePageToDB(pPager, pPage);
410 411 412 413 414 415 416
    if (ret < 0) {
      tdbError("failed to write page to db since %s", tstrerror(terrno));
      return -1;
    }
  }

  tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
M
Minglei Jin 已提交
417 418
  pPager->dbOrigSize = maxPgno;
  //  pPager->dbOrigSize = pPager->dbFileSize;
419 420 421 422 423 424 425 426 427 428 429

  // release the page
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
    if (pPage->isLocal) continue;
    pPage->isDirty = 0;

    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }
M
Minglei Jin 已提交
430

431
  /*
432
  tdbTrace("reset dirty tree: %p", &pPager->rbt);
433
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
434

435 436 437 438 439 440 441
  // sync the db file
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  */
442 443 444
  return 0;
}

445 446 447 448 449 450 451
// recovery dirty pages
int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
  int    pgIdx;
  SPgno  journalSize = 0;
  int    ret;

452 453 454 455 456
  if (pTxn->jfd == 0) {
    // txn is commited
    return 0;
  }

M
Minglei Jin 已提交
457 458
  // sync the journal file
  ret = tdbOsFSync(pTxn->jfd);
459
  if (ret < 0) {
M
Minglei Jin 已提交
460
    tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
461 462
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
463 464
  }

M
Minglei Jin 已提交
465
  tdb_fd_t jfd = pTxn->jfd;
466 467 468 469 470 471

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

472 473 474 475 476 477
  if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
    tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

M
Minglei Jin 已提交
478
  u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
479 480 481
  if (pageBuf == NULL) {
    return -1;
  }
482

483
  tdbDebug("pager/abort: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
484

485
  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
486 487 488 489 490
    // read pgno & the page from journal
    SPgno pgno;

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
491
      tdbOsFree(pageBuf);
492 493 494
      return -1;
    }

495 496 497
    tdbTrace("pager/abort: restore pgno:%d,", pgno);

    tdbPCacheInvalidatePage(pPager->pCache, pPager, pgno);
498

499 500
    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
      tdbOsFree(pageBuf);
      return -1;
    }

    i64 offset = pPager->pageSize * (pgno - 1);
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
      terrno = TAOS_SYSTEM_ERROR(errno);
      tdbOsFree(pageBuf);
      return -1;
    }

    ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
    if (ret < 0) {
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
               pPager->pageSize);
      terrno = TAOS_SYSTEM_ERROR(errno);
      tdbOsFree(pageBuf);
519 520 521
      return -1;
    }
  }
522 523 524 525 526 527 528 529 530 531

  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    tdbOsFree(pageBuf);
    return -1;
  }

  tdbOsFree(pageBuf);

532
  // 3, release the dirty pages
533 534 535 536
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
537 538 539
    SPgno pgno = TDB_PAGE_PGNO(pPage);

    tdbTrace("pager/abort: drop dirty pgno:%d,", pgno);
540 541 542

    pPage->isDirty = 0;

543
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
M
Minglei Jin 已提交
544
    hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
M
Minglei Jin 已提交
545
    tdbPCacheMarkFree(pPager->pCache, pPage);
546 547 548
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }

549
  tdbTrace("pager/abort: reset dirty tree: %p", &pPager->rbt);
550 551
  tRBTreeCreate(&pPager->rbt, pageCmpFn);

552
  // 4, remove the journal file
M
Minglei Jin 已提交
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
  if (tdbOsClose(pTxn->jfd) < 0) {
    tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  char jTxnFileName[TDB_FILENAME_LEN];
  sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);

  if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) {
    tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  // pPager->inTran = 0;
H
Hongze Cheng 已提交
569

H
more  
Hongze Cheng 已提交
570 571 572
  return 0;
}

573 574
int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
575 576
  i32    nRef;
  SPgno  maxPgno = pPager->dbOrigSize;
577 578 579 580 581 582 583
  int    ret;

  // loop to write the dirty pages to file
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
584 585 586 587 588 589 590 591 592
    nRef = tdbGetPageRef(pPage);
    if (nRef > 1) {
      continue;
    }

    SPgno pgno = TDB_PAGE_PGNO(pPage);
    if (pgno > maxPgno) {
      maxPgno = pgno;
    }
593
    ret = tdbPagerPWritePageToDB(pPager, pPage);
594 595 596 597 598
    if (ret < 0) {
      tdbError("failed to write page to db since %s", tstrerror(terrno));
      return -1;
    }

599
    tdbTrace("tdb/flush:%p, pgno:%d, %d/%d/%d", pPager, pgno, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
600 601 602 603
    pPager->dbOrigSize = maxPgno;

    pPage->isDirty = 0;

604
    tdbTrace("pager/flush drop page: %p, pgno:%d, from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
605 606 607 608 609
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);

    break;
  }
610 611 612

  tdbDebug("pager/flush: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);

613 614 615
  /*
  tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
  pPager->dbOrigSize = maxPgno;
616 617 618 619 620

  // release the page
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
621 622 623 624
    nRef = tdbGetPageRef(pPage);
    if (nRef > 1) {
      continue;
    }
625 626 627

    pPage->isDirty = 0;

628
    tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
629 630 631
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }
632
  */
633 634 635
  return 0;
}

H
Hongze Cheng 已提交
636
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
H
Hongze Cheng 已提交
637
                      TXN *pTxn) {
H
more  
Hongze Cheng 已提交
638 639 640
  SPage *pPage;
  SPgid  pgid;
  int    ret;
H
Hongze Cheng 已提交
641 642
  SPgno  pgno;
  u8     loadPage;
H
more  
Hongze Cheng 已提交
643

H
Hongze Cheng 已提交
644 645
  pgno = *ppgno;
  loadPage = 1;
H
more  
Hongze Cheng 已提交
646

H
Hongze Cheng 已提交
647 648 649 650
  // alloc new page
  if (pgno == 0) {
    loadPage = 0;
    ret = tdbPagerAllocPage(pPager, &pgno);
H
Hongze Cheng 已提交
651
    if (ret < 0) {
652
      tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
H
Hongze Cheng 已提交
653
      return -1;
H
more  
Hongze Cheng 已提交
654
    }
H
more  
Hongze Cheng 已提交
655 656
  }

657 658 659 660
  if (pgno == 0) {
    tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
    return -1;
  }
H
Hongze Cheng 已提交
661

H
Hongze Cheng 已提交
662
  // fetch a page container
H
more  
Hongze Cheng 已提交
663
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
H
Hongze Cheng 已提交
664
  pgid.pgno = pgno;
665 666
  while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) {
    tdbPagerFlushPage(pPager, pTxn);
H
more  
Hongze Cheng 已提交
667 668
  }

669
  tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
H
Hongze Cheng 已提交
670 671 672 673
  // init page if need
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
    if (ret < 0) {
674
      tdbError("tdb/pager: %p, pPage: %p, init page failed.", pPager, pPage);
H
Hongze Cheng 已提交
675 676
      return -1;
    }
H
Hongze Cheng 已提交
677
  }
H
more  
Hongze Cheng 已提交
678

H
Hongze Cheng 已提交
679 680 681
  // printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);

682 683 684 685 686 687 688 689
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    tdbError("tdb/pager: %p, pPage: %p, fetch page uninited.", pPager, pPage);
    return -1;
  }
  if (pPage->pPager != pPager) {
    tdbError("tdb/pager: %p/%p, fetch page failed.", pPager, pPage->pPager);
    return -1;
  }
H
more  
Hongze Cheng 已提交
690

H
Hongze Cheng 已提交
691
  *ppgno = pgno;
H
more  
Hongze Cheng 已提交
692 693 694 695
  *ppPage = pPage;
  return 0;
}

H
Hongze Cheng 已提交
696 697 698 699 700
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
  tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  // printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);
}
H
Hongze Cheng 已提交
701

702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751
int tdbPagerInsertFreePage(SPager *pPager, SPgno pgno, TXN *pTxn) {
  int code = 0;

  code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn);
  if (code < 0) {
    return -1;
  }

  return code;
}

static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno) {
  int  code = 0;
  TBC *pCur;

  if (!pPager->pEnv->pFreeDb) {
    return 0;
  }

  code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, NULL);
  if (code < 0) {
    return 0;
  }

  code = tdbTbcMoveToFirst(pCur);
  if (code) {
    tdbTbcClose(pCur);
    return 0;
  }

  void *pKey = NULL;
  int   nKey = 0;

  code = tdbTbcGet(pCur, (const void **)&pKey, &nKey, NULL, NULL);
  if (code < 0) {
    tdbTbcClose(pCur);
    return 0;
  }

  *pPgno = *(SPgno *)pKey;

  code = tdbTbcDelete(pCur);
  if (code < 0) {
    tdbTbcClose(pCur);
    return 0;
  }
  tdbTbcClose(pCur);
  return 0;
}

H
more  
Hongze Cheng 已提交
752 753
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
  // TODO: Allocate a page from the free list
754
  return tdbPagerRemoveFreePage(pPager, ppgno);
H
more  
Hongze Cheng 已提交
755 756 757 758 759 760 761
}

static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
  *ppgno = ++pPager->dbFileSize;
  return 0;
}

H
Hongze Cheng 已提交
762
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
H
more  
Hongze Cheng 已提交
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
  int ret;

  *ppgno = 0;

  // Try to allocate from the free list of the pager
  ret = tdbPagerAllocFreePage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  if (*ppgno != 0) return 0;

  // Allocate the page by extending the pager
  ret = tdbPagerAllocNewPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

781 782 783 784
  if (*ppgno == 0) {
    tdbError("tdb/pager:%p, alloc new page failed.", pPager);
    return -1;
  }
H
Hongze Cheng 已提交
785 786 787
  return 0;
}

H
Hongze Cheng 已提交
788 789 790 791 792
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage) {
  int   ret;
  int   lcode;
  int   nLoops;
793 794
  i64   nRead = 0;
  SPgno pgno = 0;
H
Hongze Cheng 已提交
795
  int   init = 0;
H
Hongze Cheng 已提交
796

H
Hongze Cheng 已提交
797 798
  lcode = TDB_TRY_LOCK_PAGE(pPage);
  if (lcode == P_LOCK_SUCC) {
H
Hongze Cheng 已提交
799 800 801 802 803
    if (TDB_PAGE_INITIALIZED(pPage)) {
      TDB_UNLOCK_PAGE(pPage);
      return 0;
    }

H
Hongze Cheng 已提交
804 805
    pgno = TDB_PAGE_PGNO(pPage);

806
    tdbTrace("tdb/pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
H
Hongze Cheng 已提交
807 808 809
    if (loadPage && pgno <= pPager->dbOrigSize) {
      init = 1;

H
Hongze Cheng 已提交
810
      nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
811
      tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
H
Hongze Cheng 已提交
812
      if (nRead < pPage->pageSize) {
813 814
        tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize);
        TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
815 816
        return -1;
      }
H
Hongze Cheng 已提交
817 818
    } else {
      init = 0;
H
Hongze Cheng 已提交
819 820
    }

H
Hongze Cheng 已提交
821
    ret = (*initPage)(pPage, arg, init);
H
Hongze Cheng 已提交
822
    if (ret < 0) {
823 824
      tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " init page failed.", pPager, pgno, nRead,
               pPage->pageSize);
H
Hongze Cheng 已提交
825 826 827 828 829 830 831
      TDB_UNLOCK_PAGE(pPage);
      return -1;
    }

    pPage->pPager = pPager;

    TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
832
  } else if (lcode == P_LOCK_BUSY) {
H
Hongze Cheng 已提交
833 834 835 836 837 838 839 840 841
    nLoops = 0;
    for (;;) {
      if (TDB_PAGE_INITIALIZED(pPage)) break;
      nLoops++;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
    }
H
Hongze Cheng 已提交
842
  } else {
843 844
    tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " lock page failed.", pPager, pgno, nRead,
             pPage->pageSize);
H
Hongze Cheng 已提交
845
    return -1;
H
Hongze Cheng 已提交
846 847
  }

H
Hongze Cheng 已提交
848 849 850 851 852 853 854 855 856 857
  return 0;
}

// ---------------------------- Journal manipulation
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
  int   ret;
  SPgno pgno;

  pgno = TDB_PAGE_PGNO(pPage);

M
Minglei Jin 已提交
858
  ret = tdbOsWrite(pPager->pActiveTxn->jfd, &pgno, sizeof(pgno));
H
Hongze Cheng 已提交
859
  if (ret < 0) {
M
Minglei Jin 已提交
860 861
    tdbError("failed to write pgno due to %s. file:%s, pgno:%u, txnId:%" PRId64, strerror(errno), pPager->jFileName,
             pgno, pPager->pActiveTxn->txnId);
862
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
863 864 865
    return -1;
  }

M
Minglei Jin 已提交
866
  ret = tdbOsWrite(pPager->pActiveTxn->jfd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
867
  if (ret < 0) {
M
Minglei Jin 已提交
868 869
    tdbError("failed to write page data due to %s. file:%s, pageSize:%d, txnId:%" PRId64, strerror(errno),
             pPager->jFileName, pPage->pageSize, pPager->pActiveTxn->txnId);
870
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
871 872 873 874 875
    return -1;
  }

  return 0;
}
876
/*
H
Hongze Cheng 已提交
877 878 879 880
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

881
  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
H
Hongze Cheng 已提交
882
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
883
    tdbError("failed to lseek due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
884
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
885 886 887
    return -1;
  }

H
Hongze Cheng 已提交
888
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
889
  if (ret < 0) {
M
Minglei Jin 已提交
890
    tdbError("failed to write page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->dbFileName,
M
Minglei Jin 已提交
891
             pPage->pageSize);
892
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
893 894 895
    return -1;
  }

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
  return 0;
}
*/
static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);

  ret = tdbOsPWrite(pPager->fd, pPage->pData, pPage->pageSize, offset);
  if (ret < 0) {
    tdbError("failed to pwrite page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->dbFileName,
             pPage->pageSize);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
913
  return 0;
914 915
}

916
static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
917
  int   ret = 0;
918
  SPgno journalSize = 0;
919
  u8   *pageBuf = NULL;
920

921
  tdb_fd_t jfd = tdbOsOpen(jFileName, TDB_O_RDWR, 0755);
922
  if (jfd == NULL) {
923 924 925 926 927 928 929 930
    return 0;
  }

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

M
Minglei Jin 已提交
931 932 933 934 935 936
  if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
    tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

937 938 939 940 941
  pageBuf = tdbOsCalloc(1, pPager->pageSize);
  if (pageBuf == NULL) {
    return -1;
  }

942 943
  tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName);

944 945
  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
    // read pgno & the page from journal
946
    SPgno pgno;
947 948 949

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
M
Minglei Jin 已提交
950
      tdbOsFree(pageBuf);
951 952 953
      return -1;
    }

954 955
    tdbTrace("pager/restore: restore pgno:%d,", pgno);

956 957
    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
M
Minglei Jin 已提交
958
      tdbOsFree(pageBuf);
959 960 961
      return -1;
    }

M
Minglei Jin 已提交
962 963
    i64 offset = pPager->pageSize * (pgno - 1);
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
964
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
965
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
966
      tdbOsFree(pageBuf);
M
Minglei Jin 已提交
967 968 969 970 971
      return -1;
    }

    ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
    if (ret < 0) {
M
Minglei Jin 已提交
972 973
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
               pPager->pageSize);
974
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
975
      tdbOsFree(pageBuf);
M
Minglei Jin 已提交
976 977
      return -1;
    }
978 979
  }

980 981 982
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
983
    tdbOsFree(pageBuf);
984 985
    return -1;
  }
986 987 988

  tdbOsFree(pageBuf);

989 990 991 992 993 994
  if (tdbOsClose(jfd) < 0) {
    tdbError("failed to close jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

995
  if (tdbOsRemove(jFileName) < 0 && errno != ENOENT) {
996 997 998 999
    tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
1000 1001 1002

  return 0;
}
1003

1004 1005 1006 1007 1008 1009
static int32_t txnIdCompareDesc(const void *pLeft, const void *pRight) {
  int64_t lhs = *(int64_t *)pLeft;
  int64_t rhs = *(int64_t *)pRight;
  return lhs > rhs ? -1 : 1;
}

1010
int tdbPagerRestoreJournals(SPager *pPager) {
1011 1012 1013 1014 1015 1016 1017
  tdbDirEntryPtr pDirEntry;
  tdbDirPtr      pDir = taosOpenDir(pPager->pEnv->dbName);
  if (pDir == NULL) {
    tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(errno));
    return -1;
  }

1018 1019
  SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));

1020 1021 1022
  while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
    char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
    if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
      int64_t txnId = -1;
      sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
      taosArrayPush(pTxnList, &txnId);
    }
  }
  taosArraySort(pTxnList, txnIdCompareDesc);
  for (int i = 0; i < TARRAY_SIZE(pTxnList); ++i) {
    int64_t *pTxnId = taosArrayGet(pTxnList, i);
    char     jname[TD_PATH_MAX] = {0};
    int      dirLen = strlen(pPager->pEnv->dbName);
    memcpy(jname, pPager->pEnv->dbName, dirLen);
    jname[dirLen] = '/';
    sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
    if (tdbPagerRestore(pPager, jname) < 0) {
      tdbCloseDir(&pDir);

      tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);
      return -1;
1041 1042 1043
    }
  }

1044
  taosArrayDestroy(pTxnList);
1045 1046 1047 1048 1049
  tdbCloseDir(&pDir);

  return 0;
}

1050
int tdbPagerRollback(SPager *pPager) {
1051 1052 1053 1054
  tdbDirEntryPtr pDirEntry;
  tdbDirPtr      pDir = taosOpenDir(pPager->pEnv->dbName);
  if (pDir == NULL) {
    tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(errno));
1055 1056 1057
    return -1;
  }

1058 1059 1060 1061
  while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
    char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));

    if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
1062 1063 1064 1065 1066 1067
      char jname[TD_PATH_MAX] = {0};
      int  dirLen = strlen(pPager->pEnv->dbName);
      memcpy(jname, pPager->pEnv->dbName, dirLen);
      jname[dirLen] = '/';
      memcpy(jname + dirLen + 1, name, strlen(name));
      if (tdbOsRemove(jname) < 0 && errno != ENOENT) {
M
Minglei Jin 已提交
1068 1069
        tdbCloseDir(&pDir);

1070 1071 1072 1073 1074 1075 1076 1077 1078
        tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), name);
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
    }
  }

  tdbCloseDir(&pDir);

1079 1080
  return 0;
}