tdbPager.c 29.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tdbInt.h"
17
/*
H
Hongze Cheng 已提交
18
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21 22 23 24
  u8    hdrString[16];
  u16   pageSize;
  SPgno freePage;
  u32   nFreePages;
  u8    reserved[102];
H
Hongze Cheng 已提交
25
} SFileHdr;
wafwerar's avatar
wafwerar 已提交
26
#pragma pack(pop)
H
Hongze Cheng 已提交
27

H
Hongze Cheng 已提交
28
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
29
*/
30
struct hashset_st {
M
Minglei Jin 已提交
31 32 33
  size_t  nbits;
  size_t  mask;
  size_t  capacity;
34
  size_t *items;
M
Minglei Jin 已提交
35 36
  size_t  nitems;
  double  load_factor;
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
};

static const unsigned int prime = 39;
static const unsigned int prime2 = 5009;

hashset_t hashset_create(void) {
  hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st));
  if (!set) {
    return NULL;
  }

  set->nbits = 4;
  set->capacity = (size_t)(1 << set->nbits);
  set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
  if (!set->items) {
    tdbOsFree(set);
    return NULL;
  }
  set->mask = set->capacity - 1;
  set->nitems = 0;

  set->load_factor = 0.75;

  return set;
}

void hashset_destroy(hashset_t set) {
  if (set) {
    tdbOsFree(set->items);
    tdbOsFree(set);
  }
}

int hashset_add_member(hashset_t set, void *item) {
M
Minglei Jin 已提交
71
  size_t value = (size_t)item;
72 73 74
  size_t h;

  if (value == 0) {
M
Minglei Jin 已提交
75
    return -1;
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  }

  for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      return 0;
    }
  }

  set->items[h] = value;
  ++set->nitems;
  return 1;
}

int hashset_add(hashset_t set, void *item) {
  int ret = hashset_add_member(set, item);

  size_t old_capacity = set->capacity;
  if (set->nitems >= (double)old_capacity * set->load_factor) {
    size_t *old_items = set->items;
    ++set->nbits;
    set->capacity = (size_t)(1 << set->nbits);
    set->mask = set->capacity - 1;

    set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
    if (!set->items) {
      return -1;
    }

    set->nitems = 0;
    for (size_t i = 0; i < old_capacity; ++i) {
M
Minglei Jin 已提交
106
      hashset_add_member(set, (void *)old_items[i]);
107 108 109 110 111 112 113 114
    }
    tdbOsFree(old_items);
  }

  return ret;
}

int hashset_remove(hashset_t set, void *item) {
M
Minglei Jin 已提交
115
  size_t value = (size_t)item;
116 117 118 119 120 121 122 123 124 125 126 127 128

  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      set->items[h] = 0;
      --set->nitems;
      return 1;
    }
  }

  return 0;
}

int hashset_contains(hashset_t set, void *item) {
M
Minglei Jin 已提交
129
  size_t value = (size_t)item;
130 131 132 133 134 135 136 137 138 139

  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      return 1;
    }
  }

  return 0;
}

H
Hongze Cheng 已提交
140 141
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)

H
Hongze Cheng 已提交
142 143
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage);
H
Hongze Cheng 已提交
144
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
145
static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147 148 149
static FORCE_INLINE int32_t pageCmpFn(const SRBTreeNode *lhs, const SRBTreeNode *rhs) {
  SPage *pPageL = (SPage *)(((uint8_t *)lhs) - offsetof(SPage, node));
  SPage *pPageR = (SPage *)(((uint8_t *)rhs) - offsetof(SPage, node));
150 151 152 153 154 155 156 157 158 159 160 161 162

  SPgno pgnoL = TDB_PAGE_PGNO(pPageL);
  SPgno pgnoR = TDB_PAGE_PGNO(pPageR);

  if (pgnoL < pgnoR) {
    return -1;
  } else if (pgnoL > pgnoR) {
    return 1;
  } else {
    return 0;
  }
}

H
refact  
Hongze Cheng 已提交
163
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
H
more  
Hongze Cheng 已提交
164
  uint8_t *pPtr;
H
Hongze Cheng 已提交
165
  SPager  *pPager;
H
more  
Hongze Cheng 已提交
166 167
  int      fsize;
  int      zsize;
H
Hongze Cheng 已提交
168
  int      ret;
H
more  
Hongze Cheng 已提交
169

H
refact  
Hongze Cheng 已提交
170
  *ppPager = NULL;
H
more  
Hongze Cheng 已提交
171 172

  fsize = strlen(fileName);
H
refact  
Hongze Cheng 已提交
173
  zsize = sizeof(*pPager)  /* SPager */
H
more  
Hongze Cheng 已提交
174 175
          + fsize + 1      /* dbFileName */
          + fsize + 8 + 1; /* jFileName */
H
Hongze Cheng 已提交
176
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
H
more  
Hongze Cheng 已提交
177 178 179 180
  if (pPtr == NULL) {
    return -1;
  }

H
refact  
Hongze Cheng 已提交
181 182 183 184 185 186
  pPager = (SPager *)pPtr;
  pPtr += sizeof(*pPager);
  // pPager->dbFileName
  pPager->dbFileName = (char *)pPtr;
  memcpy(pPager->dbFileName, fileName, fsize);
  pPager->dbFileName[fsize] = '\0';
H
more  
Hongze Cheng 已提交
187
  pPtr += fsize + 1;
H
refact  
Hongze Cheng 已提交
188 189 190 191 192 193 194 195
  // pPager->jFileName
  pPager->jFileName = (char *)pPtr;
  memcpy(pPager->jFileName, fileName, fsize);
  memcpy(pPager->jFileName + fsize, "-journal", 8);
  pPager->jFileName[fsize + 8] = '\0';
  // pPager->pCache
  pPager->pCache = pCache;

H
Hongze Cheng 已提交
196
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
M
Minglei Jin 已提交
197 198
  if (TDB_FD_INVALID(pPager->fd)) {
    // if (pPager->fd < 0) {
H
more  
Hongze Cheng 已提交
199 200 201
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
202
  ret = tdbGnrtFileID(pPager->fd, pPager->fid, false);
H
Hongze Cheng 已提交
203 204 205 206
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
207
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
208
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
H
Hongze Cheng 已提交
209 210
  // pPager->dbOrigSize
  ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
H
Hongze Cheng 已提交
211
  pPager->dbFileSize = pPager->dbOrigSize;
H
more  
Hongze Cheng 已提交
212

213
  tdbTrace("pager/open reset dirty tree: %p", &pPager->rbt);
214 215
  tRBTreeCreate(&pPager->rbt, pageCmpFn);

H
refact  
Hongze Cheng 已提交
216
  *ppPager = pPager;
H
more  
Hongze Cheng 已提交
217 218 219
  return 0;
}

H
refact  
Hongze Cheng 已提交
220
int tdbPagerClose(SPager *pPager) {
H
Hongze Cheng 已提交
221
  if (pPager) {
M
Minglei Jin 已提交
222
    /*
H
Hongze Cheng 已提交
223 224 225
    if (pPager->inTran) {
      tdbOsClose(pPager->jfd);
    }
M
Minglei Jin 已提交
226
    */
H
Hongze Cheng 已提交
227 228 229
    tdbOsClose(pPager->fd);
    tdbOsFree(pPager);
  }
H
more  
Hongze Cheng 已提交
230 231
  return 0;
}
H
Hongze Cheng 已提交
232

H
refact  
Hongze Cheng 已提交
233
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
H
Hongze Cheng 已提交
234 235
  int     ret;
  SPage **ppPage;
H
more  
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237
  if (pPage->isDirty) return 0;
H
more  
Hongze Cheng 已提交
238

H
Hongze Cheng 已提交
239
  // ref page one more time so the page will not be release
H
Hongze Cheng 已提交
240
  tdbRefPage(pPage);
241
  tdbTrace("pager/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
242

H
Hongze Cheng 已提交
243 244
  // Set page as dirty
  pPage->isDirty = 1;
245

246
  tdbTrace("tdb/pager-write: put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
247
  tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
H
Hongze Cheng 已提交
248

H
Hongze Cheng 已提交
249
  // Write page to journal if neccessary
M
Minglei Jin 已提交
250
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize &&
M
Minglei Jin 已提交
251 252
      (pPager->pActiveTxn->jPageSet == NULL ||
       !hashset_contains(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) {
H
Hongze Cheng 已提交
253 254
    ret = tdbPagerWritePageToJournal(pPager, pPage);
    if (ret < 0) {
255
      tdbError("failed to write page to journal since %s", tstrerror(terrno));
H
Hongze Cheng 已提交
256
      return -1;
H
Hongze Cheng 已提交
257
    }
258

M
Minglei Jin 已提交
259 260
    if (pPager->pActiveTxn->jPageSet) {
      hashset_add(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
261
    }
H
more  
Hongze Cheng 已提交
262
  }
H
Hongze Cheng 已提交
263

H
Hongze Cheng 已提交
264 265 266
  return 0;
}

H
Hongze Cheng 已提交
267
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
M
Minglei Jin 已提交
268
  /*
H
refact  
Hongze Cheng 已提交
269
  if (pPager->inTran) {
H
more  
Hongze Cheng 已提交
270 271
    return 0;
  }
M
Minglei Jin 已提交
272
  */
H
Hongze Cheng 已提交
273
  // Open the journal
M
Minglei Jin 已提交
274 275 276 277
  char jTxnFileName[TDB_FILENAME_LEN];
  sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
  pTxn->jfd = tdbOsOpen(jTxnFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
  if (TDB_FD_INVALID(pTxn->jfd)) {
278 279
    tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
280 281 282
    return -1;
  }

M
Minglei Jin 已提交
283
  pTxn->jPageSet = hashset_create();
M
Minglei Jin 已提交
284

M
Minglei Jin 已提交
285
  pPager->pActiveTxn = pTxn;
286 287 288

  tdbDebug("pager/begin: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);

H
Hongze Cheng 已提交
289
  // TODO: write the size of the file
M
Minglei Jin 已提交
290
  /*
H
refact  
Hongze Cheng 已提交
291
  pPager->inTran = 1;
M
Minglei Jin 已提交
292
  */
H
more  
Hongze Cheng 已提交
293 294
  return 0;
}
295 296 297 298 299 300 301 302 303 304
/*
int tdbPagerCancelDirty(SPager *pPager, SPage *pPage, TXN *pTxn) {
  SRBTreeNode *pNode = tRBTreeGet(&pPager->rbt, (SRBTreeNode *)pPage);
  if (pNode) {
    pPage->isDirty = 0;

    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    if (pTxn->jPageSet) {
      hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
    }
H
more  
Hongze Cheng 已提交
305

306 307 308 309 310 311
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }

  return 0;
}
*/
H
Hongze Cheng 已提交
312
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
H
Hongze Cheng 已提交
313 314 315
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
316
  // sync the journal file
M
Minglei Jin 已提交
317
  ret = tdbOsFSync(pTxn->jfd);
H
Hongze Cheng 已提交
318
  if (ret < 0) {
M
Minglei Jin 已提交
319
    tdbError("failed to fsync: %s. jFileName:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
320 321
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
H
Hongze Cheng 已提交
322 323
  }

324
  // loop to write the dirty pages to file
325 326 327 328
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
329

330 331 332 333 334
    if (pPage->nOverflow != 0) {
      tdbError("tdb/pager-commit: %p, pPage: %p, ovfl: %d, commit page failed.", pPager, pPage, pPage->nOverflow);
      return -1;
    }

335
    ret = tdbPagerPWritePageToDB(pPager, pPage);
336
    if (ret < 0) {
337
      tdbError("failed to write page to db since %s", tstrerror(terrno));
338 339 340 341
      return -1;
    }
  }

342 343
  tdbDebug("pager/commit: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);

344
  pPager->dbOrigSize = pPager->dbFileSize;
H
Hongze Cheng 已提交
345

H
Hongze Cheng 已提交
346
  // release the page
347 348 349
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
H
Hongze Cheng 已提交
350 351 352

    pPage->isDirty = 0;

353
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
M
Minglei Jin 已提交
354 355
    if (pTxn->jPageSet) {
      hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
356
    }
357 358 359

    tdbTrace("tdb/pager-commit: remove page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);

H
Hongze Cheng 已提交
360
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
H
Hongze Cheng 已提交
361
  }
362

363
  tdbTrace("tdb/pager-commit reset dirty tree: %p", &pPager->rbt);
364
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
H
Hongze Cheng 已提交
365 366

  // sync the db file
367 368 369 370 371
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
H
Hongze Cheng 已提交
372

373 374 375 376
  return 0;
}

int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
M
Minglei Jin 已提交
377 378 379
  char jTxnFileName[TDB_FILENAME_LEN];
  sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);

380
  // remove the journal file
M
Minglei Jin 已提交
381 382
  if (tdbOsClose(pTxn->jfd) < 0) {
    tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
383 384 385 386
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

M
Minglei Jin 已提交
387 388
  if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) {
    tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName);
389 390 391 392
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

M
Minglei Jin 已提交
393
  // pPager->inTran = 0;
394

395 396
  tdbDebug("pager/post-commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);

397 398 399
  return 0;
}

400 401
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
M
Minglei Jin 已提交
402
  SPgno  maxPgno = pPager->dbOrigSize;
403 404 405
  int    ret;

  // sync the journal file
M
Minglei Jin 已提交
406
  ret = tdbOsFSync(pTxn->jfd);
407
  if (ret < 0) {
M
Minglei Jin 已提交
408
    tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
409 410 411 412
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

413 414 415 416 417 418
  // loop to write the dirty pages to file
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
    if (pPage->isLocal) continue;
M
Minglei Jin 已提交
419 420 421 422 423

    SPgno pgno = TDB_PAGE_PGNO(pPage);
    if (pgno > maxPgno) {
      maxPgno = pgno;
    }
424
    ret = tdbPagerPWritePageToDB(pPager, pPage);
425 426 427 428 429 430 431
    if (ret < 0) {
      tdbError("failed to write page to db since %s", tstrerror(terrno));
      return -1;
    }
  }

  tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
M
Minglei Jin 已提交
432 433
  pPager->dbOrigSize = maxPgno;
  //  pPager->dbOrigSize = pPager->dbFileSize;
434 435 436 437 438 439 440 441 442 443 444

  // release the page
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
    if (pPage->isLocal) continue;
    pPage->isDirty = 0;

    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }
M
Minglei Jin 已提交
445

446
  /*
447
  tdbTrace("reset dirty tree: %p", &pPager->rbt);
448
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
449

450 451 452 453 454 455 456
  // sync the db file
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  */
457 458 459
  return 0;
}

460 461 462 463 464 465 466
// recovery dirty pages
int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
  int    pgIdx;
  SPgno  journalSize = 0;
  int    ret;

467 468 469 470 471
  if (pTxn->jfd == 0) {
    // txn is commited
    return 0;
  }

M
Minglei Jin 已提交
472 473
  // sync the journal file
  ret = tdbOsFSync(pTxn->jfd);
474
  if (ret < 0) {
M
Minglei Jin 已提交
475
    tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
476 477
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
478 479
  }

M
Minglei Jin 已提交
480
  tdb_fd_t jfd = pTxn->jfd;
481 482 483 484 485 486

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

487 488 489 490 491 492
  if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
    tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

M
Minglei Jin 已提交
493
  u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
494 495 496
  if (pageBuf == NULL) {
    return -1;
  }
497

498
  tdbDebug("pager/abort: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
499

500
  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
501 502 503 504 505
    // read pgno & the page from journal
    SPgno pgno;

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
506
      tdbOsFree(pageBuf);
507 508 509
      return -1;
    }

510 511 512
    tdbTrace("pager/abort: restore pgno:%d,", pgno);

    tdbPCacheInvalidatePage(pPager->pCache, pPager, pgno);
513

514 515
    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
      tdbOsFree(pageBuf);
      return -1;
    }

    i64 offset = pPager->pageSize * (pgno - 1);
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
      terrno = TAOS_SYSTEM_ERROR(errno);
      tdbOsFree(pageBuf);
      return -1;
    }

    ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
    if (ret < 0) {
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
               pPager->pageSize);
      terrno = TAOS_SYSTEM_ERROR(errno);
      tdbOsFree(pageBuf);
534 535 536
      return -1;
    }
  }
537 538 539 540 541 542 543 544 545 546

  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    tdbOsFree(pageBuf);
    return -1;
  }

  tdbOsFree(pageBuf);

547
  // 3, release the dirty pages
548 549 550 551
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
552 553 554
    SPgno pgno = TDB_PAGE_PGNO(pPage);

    tdbTrace("pager/abort: drop dirty pgno:%d,", pgno);
555 556 557

    pPage->isDirty = 0;

558
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
M
Minglei Jin 已提交
559
    hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
M
Minglei Jin 已提交
560
    tdbPCacheMarkFree(pPager->pCache, pPage);
561 562 563
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }

564
  tdbTrace("pager/abort: reset dirty tree: %p", &pPager->rbt);
565 566
  tRBTreeCreate(&pPager->rbt, pageCmpFn);

567
  // 4, remove the journal file
M
Minglei Jin 已提交
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
  if (tdbOsClose(pTxn->jfd) < 0) {
    tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  char jTxnFileName[TDB_FILENAME_LEN];
  sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);

  if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) {
    tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  // pPager->inTran = 0;
H
Hongze Cheng 已提交
584

H
more  
Hongze Cheng 已提交
585 586 587
  return 0;
}

588 589
int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
590 591
  i32    nRef;
  SPgno  maxPgno = pPager->dbOrigSize;
592 593 594 595 596 597 598
  int    ret;

  // loop to write the dirty pages to file
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
599 600 601 602 603 604 605 606 607
    nRef = tdbGetPageRef(pPage);
    if (nRef > 1) {
      continue;
    }

    SPgno pgno = TDB_PAGE_PGNO(pPage);
    if (pgno > maxPgno) {
      maxPgno = pgno;
    }
608
    ret = tdbPagerPWritePageToDB(pPager, pPage);
609 610 611 612 613
    if (ret < 0) {
      tdbError("failed to write page to db since %s", tstrerror(terrno));
      return -1;
    }

614
    tdbTrace("tdb/flush:%p, pgno:%d, %d/%d/%d", pPager, pgno, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
615 616 617 618
    pPager->dbOrigSize = maxPgno;

    pPage->isDirty = 0;

619
    tdbTrace("pager/flush drop page: %p, pgno:%d, from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
620 621 622 623 624
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);

    break;
  }
625 626 627

  tdbDebug("pager/flush: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);

628 629 630
  /*
  tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
  pPager->dbOrigSize = maxPgno;
631 632 633 634 635

  // release the page
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
636 637 638 639
    nRef = tdbGetPageRef(pPage);
    if (nRef > 1) {
      continue;
    }
640 641 642

    pPage->isDirty = 0;

643
    tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
644 645 646
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }
647
  */
648 649 650
  return 0;
}

651 652
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn);

H
Hongze Cheng 已提交
653
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
H
Hongze Cheng 已提交
654
                      TXN *pTxn) {
H
more  
Hongze Cheng 已提交
655 656 657
  SPage *pPage;
  SPgid  pgid;
  int    ret;
H
Hongze Cheng 已提交
658 659
  SPgno  pgno;
  u8     loadPage;
H
more  
Hongze Cheng 已提交
660

H
Hongze Cheng 已提交
661 662
  pgno = *ppgno;
  loadPage = 1;
H
more  
Hongze Cheng 已提交
663

H
Hongze Cheng 已提交
664 665 666
  // alloc new page
  if (pgno == 0) {
    loadPage = 0;
667
    ret = tdbPagerAllocPage(pPager, &pgno, pTxn);
H
Hongze Cheng 已提交
668
    if (ret < 0) {
669
      tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
H
Hongze Cheng 已提交
670
      return -1;
H
more  
Hongze Cheng 已提交
671
    }
H
more  
Hongze Cheng 已提交
672 673
  }

674 675 676 677
  if (pgno == 0) {
    tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
    return -1;
  }
H
Hongze Cheng 已提交
678

H
Hongze Cheng 已提交
679
  // fetch a page container
H
more  
Hongze Cheng 已提交
680
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
H
Hongze Cheng 已提交
681
  pgid.pgno = pgno;
682 683
  while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) {
    tdbPagerFlushPage(pPager, pTxn);
H
more  
Hongze Cheng 已提交
684 685
  }

686
  tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
H
Hongze Cheng 已提交
687 688 689 690
  // init page if need
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
    if (ret < 0) {
691
      tdbError("tdb/pager: %p, pPage: %p, init page failed.", pPager, pPage);
H
Hongze Cheng 已提交
692 693
      return -1;
    }
H
Hongze Cheng 已提交
694
  }
H
more  
Hongze Cheng 已提交
695

H
Hongze Cheng 已提交
696 697 698
  // printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);

699 700 701 702 703 704 705 706
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    tdbError("tdb/pager: %p, pPage: %p, fetch page uninited.", pPager, pPage);
    return -1;
  }
  if (pPage->pPager != pPager) {
    tdbError("tdb/pager: %p/%p, fetch page failed.", pPager, pPage->pPager);
    return -1;
  }
H
more  
Hongze Cheng 已提交
707

H
Hongze Cheng 已提交
708
  *ppgno = pgno;
H
more  
Hongze Cheng 已提交
709 710 711 712
  *ppPage = pPage;
  return 0;
}

H
Hongze Cheng 已提交
713 714 715 716 717
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
  tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  // printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);
}
H
Hongze Cheng 已提交
718

719 720 721
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
  int   code = 0;
  SPgno pgno = TDB_PAGE_PGNO(pPage);
722

723 724
  // memset(pPage->pData, 0, pPage->pageSize);
  tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno);
725 726
  code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn);
  if (code < 0) {
M
Minglei Jin 已提交
727
    tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code);
728 729 730
    return -1;
  }

731 732
  pPage->pPager = NULL;

733 734 735
  return code;
}

736
static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) {
737 738 739 740 741 742 743
  int  code = 0;
  TBC *pCur;

  if (!pPager->pEnv->pFreeDb) {
    return 0;
  }

744
  code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, pTxn);
745 746 747 748 749 750
  if (code < 0) {
    return 0;
  }

  code = tdbTbcMoveToFirst(pCur);
  if (code) {
751
    tdbError("tdb/remove-free-page: moveto first failed with ret: %d.", code);
752 753 754 755 756 757 758 759 760
    tdbTbcClose(pCur);
    return 0;
  }

  void *pKey = NULL;
  int   nKey = 0;

  code = tdbTbcGet(pCur, (const void **)&pKey, &nKey, NULL, NULL);
  if (code < 0) {
M
Minglei Jin 已提交
761
    // tdbError("tdb/remove-free-page: tbc get failed with ret: %d.", code);
762 763 764 765 766
    tdbTbcClose(pCur);
    return 0;
  }

  *pPgno = *(SPgno *)pKey;
767
  tdbTrace("tdb/remove-free-page: tbc get page: %d.", *pPgno);
768 769 770

  code = tdbTbcDelete(pCur);
  if (code < 0) {
771
    tdbError("tdb/remove-free-page: tbc delete failed with ret: %d.", code);
772 773 774 775 776 777 778
    tdbTbcClose(pCur);
    return 0;
  }
  tdbTbcClose(pCur);
  return 0;
}

779
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
M
Minglei Jin 已提交
780
  // Allocate a page from the free list
781
  return tdbPagerRemoveFreePage(pPager, ppgno, pTxn);
H
more  
Hongze Cheng 已提交
782 783 784 785
}

static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
  *ppgno = ++pPager->dbFileSize;
M
Minglei Jin 已提交
786
  // tdbError("tdb/alloc-new-page: %d.", *ppgno);
H
more  
Hongze Cheng 已提交
787 788 789
  return 0;
}

790
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
H
more  
Hongze Cheng 已提交
791 792 793 794 795
  int ret;

  *ppgno = 0;

  // Try to allocate from the free list of the pager
796
  ret = tdbPagerAllocFreePage(pPager, ppgno, pTxn);
H
more  
Hongze Cheng 已提交
797 798 799 800 801 802 803 804 805 806 807 808
  if (ret < 0) {
    return -1;
  }

  if (*ppgno != 0) return 0;

  // Allocate the page by extending the pager
  ret = tdbPagerAllocNewPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

809 810 811 812
  if (*ppgno == 0) {
    tdbError("tdb/pager:%p, alloc new page failed.", pPager);
    return -1;
  }
H
Hongze Cheng 已提交
813 814 815
  return 0;
}

H
Hongze Cheng 已提交
816 817 818 819 820
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage) {
  int   ret;
  int   lcode;
  int   nLoops;
821 822
  i64   nRead = 0;
  SPgno pgno = 0;
H
Hongze Cheng 已提交
823
  int   init = 0;
H
Hongze Cheng 已提交
824

H
Hongze Cheng 已提交
825 826
  lcode = TDB_TRY_LOCK_PAGE(pPage);
  if (lcode == P_LOCK_SUCC) {
H
Hongze Cheng 已提交
827 828 829 830 831
    if (TDB_PAGE_INITIALIZED(pPage)) {
      TDB_UNLOCK_PAGE(pPage);
      return 0;
    }

H
Hongze Cheng 已提交
832 833
    pgno = TDB_PAGE_PGNO(pPage);

834
    tdbTrace("tdb/pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
H
Hongze Cheng 已提交
835 836 837
    if (loadPage && pgno <= pPager->dbOrigSize) {
      init = 1;

H
Hongze Cheng 已提交
838
      nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
839
      tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
H
Hongze Cheng 已提交
840
      if (nRead < pPage->pageSize) {
841 842
        tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize);
        TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
843 844
        return -1;
      }
H
Hongze Cheng 已提交
845 846
    } else {
      init = 0;
H
Hongze Cheng 已提交
847 848
    }

H
Hongze Cheng 已提交
849
    ret = (*initPage)(pPage, arg, init);
H
Hongze Cheng 已提交
850
    if (ret < 0) {
851 852
      tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " init page failed.", pPager, pgno, nRead,
               pPage->pageSize);
H
Hongze Cheng 已提交
853 854 855 856 857 858 859
      TDB_UNLOCK_PAGE(pPage);
      return -1;
    }

    pPage->pPager = pPager;

    TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
860
  } else if (lcode == P_LOCK_BUSY) {
H
Hongze Cheng 已提交
861 862 863 864 865 866 867 868 869
    nLoops = 0;
    for (;;) {
      if (TDB_PAGE_INITIALIZED(pPage)) break;
      nLoops++;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
    }
H
Hongze Cheng 已提交
870
  } else {
871 872
    tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " lock page failed.", pPager, pgno, nRead,
             pPage->pageSize);
H
Hongze Cheng 已提交
873
    return -1;
H
Hongze Cheng 已提交
874 875
  }

H
Hongze Cheng 已提交
876 877 878 879 880 881 882 883 884 885
  return 0;
}

// ---------------------------- Journal manipulation
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
  int   ret;
  SPgno pgno;

  pgno = TDB_PAGE_PGNO(pPage);

M
Minglei Jin 已提交
886
  ret = tdbOsWrite(pPager->pActiveTxn->jfd, &pgno, sizeof(pgno));
H
Hongze Cheng 已提交
887
  if (ret < 0) {
M
Minglei Jin 已提交
888 889
    tdbError("failed to write pgno due to %s. file:%s, pgno:%u, txnId:%" PRId64, strerror(errno), pPager->jFileName,
             pgno, pPager->pActiveTxn->txnId);
890
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
891 892 893
    return -1;
  }

M
Minglei Jin 已提交
894
  ret = tdbOsWrite(pPager->pActiveTxn->jfd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
895
  if (ret < 0) {
M
Minglei Jin 已提交
896 897
    tdbError("failed to write page data due to %s. file:%s, pageSize:%d, txnId:%" PRId64, strerror(errno),
             pPager->jFileName, pPage->pageSize, pPager->pActiveTxn->txnId);
898
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
899 900 901 902 903
    return -1;
  }

  return 0;
}
904
/*
H
Hongze Cheng 已提交
905 906 907 908
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

909
  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
H
Hongze Cheng 已提交
910
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
911
    tdbError("failed to lseek due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
912
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
913 914 915
    return -1;
  }

H
Hongze Cheng 已提交
916
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
917
  if (ret < 0) {
M
Minglei Jin 已提交
918
    tdbError("failed to write page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->dbFileName,
M
Minglei Jin 已提交
919
             pPage->pageSize);
920
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
921 922 923
    return -1;
  }

924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
  return 0;
}
*/
static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);

  ret = tdbOsPWrite(pPager->fd, pPage->pData, pPage->pageSize, offset);
  if (ret < 0) {
    tdbError("failed to pwrite page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->dbFileName,
             pPage->pageSize);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
941
  return 0;
942 943
}

944
static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
945
  int   ret = 0;
946
  SPgno journalSize = 0;
947
  u8   *pageBuf = NULL;
948

949
  tdb_fd_t jfd = tdbOsOpen(jFileName, TDB_O_RDWR, 0755);
950
  if (jfd == NULL) {
951 952 953 954 955 956 957 958
    return 0;
  }

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

M
Minglei Jin 已提交
959 960 961 962 963 964
  if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
    tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

965 966 967 968 969
  pageBuf = tdbOsCalloc(1, pPager->pageSize);
  if (pageBuf == NULL) {
    return -1;
  }

970 971
  tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName);

972 973
  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
    // read pgno & the page from journal
974
    SPgno pgno;
975 976 977

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
M
Minglei Jin 已提交
978
      tdbOsFree(pageBuf);
979 980 981
      return -1;
    }

982 983
    tdbTrace("pager/restore: restore pgno:%d,", pgno);

984 985
    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
M
Minglei Jin 已提交
986
      tdbOsFree(pageBuf);
987 988 989
      return -1;
    }

M
Minglei Jin 已提交
990 991
    i64 offset = pPager->pageSize * (pgno - 1);
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
992
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
993
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
994
      tdbOsFree(pageBuf);
M
Minglei Jin 已提交
995 996 997 998 999
      return -1;
    }

    ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
    if (ret < 0) {
M
Minglei Jin 已提交
1000 1001
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
               pPager->pageSize);
1002
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
1003
      tdbOsFree(pageBuf);
M
Minglei Jin 已提交
1004 1005
      return -1;
    }
1006 1007
  }

1008 1009 1010
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
1011
    tdbOsFree(pageBuf);
1012 1013
    return -1;
  }
1014 1015 1016

  tdbOsFree(pageBuf);

1017 1018 1019 1020 1021 1022
  if (tdbOsClose(jfd) < 0) {
    tdbError("failed to close jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

1023
  if (tdbOsRemove(jFileName) < 0 && errno != ENOENT) {
1024 1025 1026 1027
    tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
1028 1029 1030

  return 0;
}
1031

1032 1033 1034 1035 1036 1037
static int32_t txnIdCompareDesc(const void *pLeft, const void *pRight) {
  int64_t lhs = *(int64_t *)pLeft;
  int64_t rhs = *(int64_t *)pRight;
  return lhs > rhs ? -1 : 1;
}

1038
int tdbPagerRestoreJournals(SPager *pPager) {
1039 1040 1041 1042 1043 1044 1045
  tdbDirEntryPtr pDirEntry;
  tdbDirPtr      pDir = taosOpenDir(pPager->pEnv->dbName);
  if (pDir == NULL) {
    tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(errno));
    return -1;
  }

1046 1047
  SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));

1048 1049 1050
  while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
    char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
    if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
      int64_t txnId = -1;
      sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
      taosArrayPush(pTxnList, &txnId);
    }
  }
  taosArraySort(pTxnList, txnIdCompareDesc);
  for (int i = 0; i < TARRAY_SIZE(pTxnList); ++i) {
    int64_t *pTxnId = taosArrayGet(pTxnList, i);
    char     jname[TD_PATH_MAX] = {0};
    int      dirLen = strlen(pPager->pEnv->dbName);
    memcpy(jname, pPager->pEnv->dbName, dirLen);
    jname[dirLen] = '/';
    sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
    if (tdbPagerRestore(pPager, jname) < 0) {
      tdbCloseDir(&pDir);

      tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);
      return -1;
1069 1070 1071
    }
  }

1072
  taosArrayDestroy(pTxnList);
1073 1074 1075 1076 1077
  tdbCloseDir(&pDir);

  return 0;
}

1078
int tdbPagerRollback(SPager *pPager) {
1079 1080 1081 1082
  tdbDirEntryPtr pDirEntry;
  tdbDirPtr      pDir = taosOpenDir(pPager->pEnv->dbName);
  if (pDir == NULL) {
    tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(errno));
1083 1084 1085
    return -1;
  }

1086 1087 1088 1089
  while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
    char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));

    if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
1090 1091 1092 1093 1094 1095
      char jname[TD_PATH_MAX] = {0};
      int  dirLen = strlen(pPager->pEnv->dbName);
      memcpy(jname, pPager->pEnv->dbName, dirLen);
      jname[dirLen] = '/';
      memcpy(jname + dirLen + 1, name, strlen(name));
      if (tdbOsRemove(jname) < 0 && errno != ENOENT) {
M
Minglei Jin 已提交
1096 1097
        tdbCloseDir(&pDir);

1098 1099 1100 1101 1102 1103 1104 1105 1106
        tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), name);
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
    }
  }

  tdbCloseDir(&pDir);

1107 1108
  return 0;
}