tdbPager.c 23.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16 17
#include "tdbInt.h"

H
Hongze Cheng 已提交
18
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21 22 23 24
  u8    hdrString[16];
  u16   pageSize;
  SPgno freePage;
  u32   nFreePages;
  u8    reserved[102];
H
Hongze Cheng 已提交
25
} SFileHdr;
wafwerar's avatar
wafwerar 已提交
26
#pragma pack(pop)
H
Hongze Cheng 已提交
27

H
Hongze Cheng 已提交
28 29
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");

30
struct hashset_st {
31 32 33
  size_t  nbits;
  size_t  mask;
  size_t  capacity;
34
  size_t *items;
35 36
  size_t  nitems;
  double  load_factor;
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
};

static const unsigned int prime = 39;
static const unsigned int prime2 = 5009;

hashset_t hashset_create(void) {
  hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st));
  if (!set) {
    return NULL;
  }

  set->nbits = 4;
  set->capacity = (size_t)(1 << set->nbits);
  set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
  if (!set->items) {
    tdbOsFree(set);
    return NULL;
  }
  set->mask = set->capacity - 1;
  set->nitems = 0;

  set->load_factor = 0.75;

  return set;
}

void hashset_destroy(hashset_t set) {
  if (set) {
    tdbOsFree(set->items);
    tdbOsFree(set);
  }
}

int hashset_add_member(hashset_t set, void *item) {
71
  size_t value = (size_t)item;
72 73 74
  size_t h;

  if (value == 0) {
75
    return -1;
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  }

  for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      return 0;
    }
  }

  set->items[h] = value;
  ++set->nitems;
  return 1;
}

int hashset_add(hashset_t set, void *item) {
  int ret = hashset_add_member(set, item);

  size_t old_capacity = set->capacity;
  if (set->nitems >= (double)old_capacity * set->load_factor) {
    size_t *old_items = set->items;
    ++set->nbits;
    set->capacity = (size_t)(1 << set->nbits);
    set->mask = set->capacity - 1;

    set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
    if (!set->items) {
      return -1;
    }

    set->nitems = 0;
    for (size_t i = 0; i < old_capacity; ++i) {
106
      hashset_add_member(set, (void *)old_items[i]);
107 108 109 110 111 112 113 114
    }
    tdbOsFree(old_items);
  }

  return ret;
}

int hashset_remove(hashset_t set, void *item) {
115
  size_t value = (size_t)item;
116 117 118 119 120 121 122 123 124 125 126 127 128

  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      set->items[h] = 0;
      --set->nitems;
      return 1;
    }
  }

  return 0;
}

int hashset_contains(hashset_t set, void *item) {
129
  size_t value = (size_t)item;
130 131 132 133 134 135 136 137 138 139

  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
    if (set->items[h] == value) {
      return 1;
    }
  }

  return 0;
}

H
Hongze Cheng 已提交
140 141
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)

H
Hongze Cheng 已提交
142 143
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage);
H
Hongze Cheng 已提交
144 145
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147 148 149
static FORCE_INLINE int32_t pageCmpFn(const SRBTreeNode *lhs, const SRBTreeNode *rhs) {
  SPage *pPageL = (SPage *)(((uint8_t *)lhs) - offsetof(SPage, node));
  SPage *pPageR = (SPage *)(((uint8_t *)rhs) - offsetof(SPage, node));
150 151 152 153 154 155 156 157 158 159 160 161 162

  SPgno pgnoL = TDB_PAGE_PGNO(pPageL);
  SPgno pgnoR = TDB_PAGE_PGNO(pPageR);

  if (pgnoL < pgnoR) {
    return -1;
  } else if (pgnoL > pgnoR) {
    return 1;
  } else {
    return 0;
  }
}

H
refact  
Hongze Cheng 已提交
163
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
H
more  
Hongze Cheng 已提交
164
  uint8_t *pPtr;
H
Hongze Cheng 已提交
165
  SPager  *pPager;
H
more  
Hongze Cheng 已提交
166 167
  int      fsize;
  int      zsize;
H
Hongze Cheng 已提交
168
  int      ret;
H
more  
Hongze Cheng 已提交
169

H
refact  
Hongze Cheng 已提交
170
  *ppPager = NULL;
H
more  
Hongze Cheng 已提交
171 172

  fsize = strlen(fileName);
H
refact  
Hongze Cheng 已提交
173
  zsize = sizeof(*pPager)  /* SPager */
H
more  
Hongze Cheng 已提交
174 175
          + fsize + 1      /* dbFileName */
          + fsize + 8 + 1; /* jFileName */
H
Hongze Cheng 已提交
176
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
H
more  
Hongze Cheng 已提交
177 178 179 180
  if (pPtr == NULL) {
    return -1;
  }

H
refact  
Hongze Cheng 已提交
181 182 183 184 185 186
  pPager = (SPager *)pPtr;
  pPtr += sizeof(*pPager);
  // pPager->dbFileName
  pPager->dbFileName = (char *)pPtr;
  memcpy(pPager->dbFileName, fileName, fsize);
  pPager->dbFileName[fsize] = '\0';
H
more  
Hongze Cheng 已提交
187
  pPtr += fsize + 1;
H
refact  
Hongze Cheng 已提交
188 189 190 191 192 193 194 195
  // pPager->jFileName
  pPager->jFileName = (char *)pPtr;
  memcpy(pPager->jFileName, fileName, fsize);
  memcpy(pPager->jFileName + fsize, "-journal", 8);
  pPager->jFileName[fsize + 8] = '\0';
  // pPager->pCache
  pPager->pCache = pCache;

H
Hongze Cheng 已提交
196
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
M
Minglei Jin 已提交
197 198
  if (TDB_FD_INVALID(pPager->fd)) {
    // if (pPager->fd < 0) {
H
more  
Hongze Cheng 已提交
199 200 201
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
202
  ret = tdbGnrtFileID(pPager->fd, pPager->fid, false);
H
Hongze Cheng 已提交
203 204 205 206
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
207
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
208
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
H
Hongze Cheng 已提交
209 210
  // pPager->dbOrigSize
  ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
H
Hongze Cheng 已提交
211
  pPager->dbFileSize = pPager->dbOrigSize;
H
more  
Hongze Cheng 已提交
212

213
  tdbTrace("pager/open reset dirty tree: %p", &pPager->rbt);
214 215
  tRBTreeCreate(&pPager->rbt, pageCmpFn);

H
refact  
Hongze Cheng 已提交
216
  *ppPager = pPager;
H
more  
Hongze Cheng 已提交
217 218 219
  return 0;
}

H
refact  
Hongze Cheng 已提交
220
int tdbPagerClose(SPager *pPager) {
H
Hongze Cheng 已提交
221 222 223 224 225 226 227
  if (pPager) {
    if (pPager->inTran) {
      tdbOsClose(pPager->jfd);
    }
    tdbOsClose(pPager->fd);
    tdbOsFree(pPager);
  }
H
more  
Hongze Cheng 已提交
228 229
  return 0;
}
230
/*
231
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) {
H
Hongze Cheng 已提交
232 233 234 235
  SPgno  pgno;
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
236 237 238
  if (pPager->dbOrigSize > 0) {
    pgno = 1;
  } else {
H
Hongze Cheng 已提交
239 240 241
    pgno = 0;
  }

H
Hongze Cheng 已提交
242
  {
243 244
    // TODO: try to search the main DB to get the page number
    // pgno = 0;
H
Hongze Cheng 已提交
245 246
  }

247 248 249 250
  if (pgno == 0 && toCreate) {
    // allocate a new child page
    TXN txn;
    tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
H
Hongze Cheng 已提交
251

252
    pPager->inTran = 1;
H
refact  
Hongze Cheng 已提交
253

254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
    SBtreeInitPageArg zArg;
    zArg.flags = 0x1 | 0x2;  // root leaf node;
    zArg.pBt = pBt;
    ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
    if (ret < 0) {
      return -1;
    }

    //    ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
    // if (ret < 0) {
    //  return -1;
    //}

    // TODO: Need to zero the page

    ret = tdbPagerWrite(pPager, pPage);
    if (ret < 0) {
271
      tdbError("failed to write page since %s", terrstr());
272 273
      return -1;
    }
H
Hongze Cheng 已提交
274

275 276 277 278
    tdbTxnClose(&txn);
  }

  *ppgno = pgno;
H
Hongze Cheng 已提交
279 280
  return 0;
}
281
*/
H
refact  
Hongze Cheng 已提交
282
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
H
Hongze Cheng 已提交
283 284
  int     ret;
  SPage **ppPage;
H
more  
Hongze Cheng 已提交
285

H
Hongze Cheng 已提交
286 287
  ASSERT(pPager->inTran);
#if 0
H
refact  
Hongze Cheng 已提交
288 289
  if (pPager->inTran == 0) {
    ret = tdbPagerBegin(pPager);
H
more  
Hongze Cheng 已提交
290 291 292 293
    if (ret < 0) {
      return -1;
    }
  }
H
Hongze Cheng 已提交
294
#endif
H
more  
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296
  if (pPage->isDirty) return 0;
H
more  
Hongze Cheng 已提交
297

H
Hongze Cheng 已提交
298
  // ref page one more time so the page will not be release
H
Hongze Cheng 已提交
299
  tdbRefPage(pPage);
300
  tdbDebug("pager/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
301

H
Hongze Cheng 已提交
302 303
  // Set page as dirty
  pPage->isDirty = 1;
304
  /*
H
Hongze Cheng 已提交
305
  // Add page to dirty list(TODO: NOT use O(n^2) algorithm)
H
Hongze Cheng 已提交
306 307 308
  for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
       ppPage = &((*ppPage)->pDirtyNext)) {
  }
309 310 311 312 313 314 315

  if (*ppPage && TDB_PAGE_PGNO(*ppPage) == TDB_PAGE_PGNO(pPage)) {
    tdbUnrefPage(pPage);

    return 0;
  }

H
Hongze Cheng 已提交
316 317 318
  ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
  pPage->pDirtyNext = *ppPage;
  *ppPage = pPage;
319
  */
320
  tdbTrace("put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
321
  tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
H
Hongze Cheng 已提交
322

H
Hongze Cheng 已提交
323
  // Write page to journal if neccessary
324 325
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize &&
      (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) {
H
Hongze Cheng 已提交
326 327
    ret = tdbPagerWritePageToJournal(pPager, pPage);
    if (ret < 0) {
328
      tdbError("failed to write page to journal since %s", tstrerror(terrno));
H
Hongze Cheng 已提交
329
      return -1;
H
Hongze Cheng 已提交
330
    }
331 332

    if (pPager->jPageSet) {
333
      hashset_add(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
334
    }
H
more  
Hongze Cheng 已提交
335
  }
H
Hongze Cheng 已提交
336

H
Hongze Cheng 已提交
337 338 339
  return 0;
}

H
Hongze Cheng 已提交
340
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
H
refact  
Hongze Cheng 已提交
341
  if (pPager->inTran) {
H
more  
Hongze Cheng 已提交
342 343
    return 0;
  }
H
Hongze Cheng 已提交
344 345

  // Open the journal
H
Hongze Cheng 已提交
346
  pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
M
Minglei Jin 已提交
347
  if (TDB_FD_INVALID(pPager->jfd)) {
348 349
    tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
350 351 352
    return -1;
  }

353
  pPager->jPageSet = hashset_create();
H
Hongze Cheng 已提交
354 355
  // TODO: write the size of the file

H
refact  
Hongze Cheng 已提交
356
  pPager->inTran = 1;
H
Hongze Cheng 已提交
357

H
more  
Hongze Cheng 已提交
358 359 360
  return 0;
}

H
Hongze Cheng 已提交
361
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
H
Hongze Cheng 已提交
362 363 364
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
365 366 367
  // sync the journal file
  ret = tdbOsFSync(pPager->jfd);
  if (ret < 0) {
368 369 370
    tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
H
Hongze Cheng 已提交
371 372
  }

373
  // loop to write the dirty pages to file
374 375 376 377
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
378

379
    ASSERT(pPage->nOverflow == 0);
380 381
    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
382
      tdbError("failed to write page to db since %s", tstrerror(terrno));
383 384 385 386
      return -1;
    }
  }

387 388
  tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
  pPager->dbOrigSize = pPager->dbFileSize;
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390
  // release the page
391 392 393
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
H
Hongze Cheng 已提交
394 395 396

    pPage->isDirty = 0;

397
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
398
    if (pPager->jPageSet) {
399
      hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
400
    }
H
Hongze Cheng 已提交
401
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
H
Hongze Cheng 已提交
402
  }
403

404
  tdbTrace("pager/commit reset dirty tree: %p", &pPager->rbt);
405
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
H
Hongze Cheng 已提交
406 407

  // sync the db file
408 409 410 411 412
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
H
Hongze Cheng 已提交
413

414 415 416 417
  return 0;
}

int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
418
  // remove the journal file
419 420 421 422 423 424 425 426 427 428 429 430
  if (tdbOsClose(pPager->jfd) < 0) {
    tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
    tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

431 432 433
  if (pPager->jPageSet) {
    hashset_destroy(pPager->jPageSet);
  }
434 435 436 437 438
  pPager->inTran = 0;

  return 0;
}

439 440 441 442 443 444 445 446
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
  int    ret;

  // sync the journal file
  ret = tdbOsFSync(pPager->jfd);
  if (ret < 0) {
    tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
447 448 449 450
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
  // loop to write the dirty pages to file
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
    if (pPage->isLocal) continue;
    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
      tdbError("failed to write page to db since %s", tstrerror(terrno));
      return -1;
    }
  }

  tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
  pPager->dbOrigSize = pPager->dbFileSize;

  // release the page
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
    if (pPage->isLocal) continue;
    pPage->isDirty = 0;

    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }
  /*
478
  tdbTrace("reset dirty tree: %p", &pPager->rbt);
479
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
480

481 482 483 484 485 486 487
  // sync the db file
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  */
488 489 490
  return 0;
}

491 492 493 494 495 496 497 498 499 500
// recovery dirty pages
int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
  int    pgIdx;
  SPgno  journalSize = 0;
  int    ret;

  // 0, sync the journal file
  ret = tdbOsFSync(pPager->jfd);
  if (ret < 0) {
501 502 503
    tdbError("failed to fsync jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
504 505
  }

506
  tdb_fd_t jfd = pPager->jfd;
507 508 509 510 511 512

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

513
  u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
514 515 516
  if (pageBuf == NULL) {
    return -1;
  }
517

518
  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
519 520 521 522 523
    // read pgno & the page from journal
    SPgno pgno;

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
524
      tdbOsFree(pageBuf);
525 526 527 528 529
      return -1;
    }

    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
      tdbOsFree(pageBuf);
      return -1;
    }

    i64 offset = pPager->pageSize * (pgno - 1);
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
      terrno = TAOS_SYSTEM_ERROR(errno);
      tdbOsFree(pageBuf);
      return -1;
    }

    ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
    if (ret < 0) {
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
               pPager->pageSize);
      terrno = TAOS_SYSTEM_ERROR(errno);
      tdbOsFree(pageBuf);
548 549 550
      return -1;
    }
  }
551 552 553 554 555 556 557 558 559 560

  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    tdbOsFree(pageBuf);
    return -1;
  }

  tdbOsFree(pageBuf);

561
  // 3, release the dirty pages
562 563 564 565
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
566 567 568

    pPage->isDirty = 0;

569
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
570
    hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
571 572 573
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }

574
  tdbTrace("reset dirty tree: %p", &pPager->rbt);
575 576
  tRBTreeCreate(&pPager->rbt, pageCmpFn);

577
  // 4, remove the journal file
H
Hongze Cheng 已提交
578
  tdbOsClose(pPager->jfd);
M
Minglei Jin 已提交
579
  (void)tdbOsRemove(pPager->jFileName);
580 581
  hashset_destroy(pPager->jPageSet);

H
Hongze Cheng 已提交
582
  pPager->inTran = 0;
H
Hongze Cheng 已提交
583

H
more  
Hongze Cheng 已提交
584 585 586
  return 0;
}

587 588
int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
589 590
  i32    nRef;
  SPgno  maxPgno = pPager->dbOrigSize;
591 592 593 594 595 596 597
  int    ret;

  // loop to write the dirty pages to file
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
598 599 600 601 602 603 604 605 606
    nRef = tdbGetPageRef(pPage);
    if (nRef > 1) {
      continue;
    }

    SPgno pgno = TDB_PAGE_PGNO(pPage);
    if (pgno > maxPgno) {
      maxPgno = pgno;
    }
607 608 609 610 611 612
    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
      tdbError("failed to write page to db since %s", tstrerror(terrno));
      return -1;
    }

613 614 615 616 617 618 619 620 621 622 623 624 625 626
    tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
    pPager->dbOrigSize = maxPgno;

    pPage->isDirty = 0;

    tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);

    break;
  }
  /*
  tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
  pPager->dbOrigSize = maxPgno;
627 628 629 630 631

  // release the page
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
632 633 634 635
    nRef = tdbGetPageRef(pPage);
    if (nRef > 1) {
      continue;
    }
636 637 638

    pPage->isDirty = 0;

639
    tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
640 641 642
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }
643
  */
644 645 646
  return 0;
}

H
Hongze Cheng 已提交
647
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
H
Hongze Cheng 已提交
648
                      TXN *pTxn) {
H
more  
Hongze Cheng 已提交
649 650 651
  SPage *pPage;
  SPgid  pgid;
  int    ret;
H
Hongze Cheng 已提交
652 653
  SPgno  pgno;
  u8     loadPage;
H
more  
Hongze Cheng 已提交
654

H
Hongze Cheng 已提交
655 656
  pgno = *ppgno;
  loadPage = 1;
H
more  
Hongze Cheng 已提交
657

H
Hongze Cheng 已提交
658 659 660 661
  // alloc new page
  if (pgno == 0) {
    loadPage = 0;
    ret = tdbPagerAllocPage(pPager, &pgno);
H
Hongze Cheng 已提交
662
    if (ret < 0) {
H
Hongze Cheng 已提交
663
      ASSERT(0);
H
Hongze Cheng 已提交
664
      return -1;
H
more  
Hongze Cheng 已提交
665
    }
H
more  
Hongze Cheng 已提交
666 667
  }

H
Hongze Cheng 已提交
668
  ASSERT(pgno > 0);
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670
  // fetch a page container
H
more  
Hongze Cheng 已提交
671
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
H
Hongze Cheng 已提交
672
  pgid.pgno = pgno;
673 674
  while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) {
    tdbPagerFlushPage(pPager, pTxn);
H
more  
Hongze Cheng 已提交
675 676
  }

677
  tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
H
Hongze Cheng 已提交
678 679 680 681
  // init page if need
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
    if (ret < 0) {
H
Hongze Cheng 已提交
682
      ASSERT(0);
H
Hongze Cheng 已提交
683 684
      return -1;
    }
H
Hongze Cheng 已提交
685
  }
H
more  
Hongze Cheng 已提交
686

H
Hongze Cheng 已提交
687 688 689
  // printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);

H
Hongze Cheng 已提交
690 691
  ASSERT(TDB_PAGE_INITIALIZED(pPage));
  ASSERT(pPage->pPager == pPager);
H
more  
Hongze Cheng 已提交
692

H
Hongze Cheng 已提交
693
  *ppgno = pgno;
H
more  
Hongze Cheng 已提交
694 695 696 697
  *ppPage = pPage;
  return 0;
}

H
Hongze Cheng 已提交
698 699 700 701 702
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
  tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  // printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);
}
H
Hongze Cheng 已提交
703

H
more  
Hongze Cheng 已提交
704 705 706 707 708 709 710 711 712 713
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
  // TODO: Allocate a page from the free list
  return 0;
}

static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
  *ppgno = ++pPager->dbFileSize;
  return 0;
}

H
Hongze Cheng 已提交
714
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
H
more  
Hongze Cheng 已提交
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
  int ret;

  *ppgno = 0;

  // Try to allocate from the free list of the pager
  ret = tdbPagerAllocFreePage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  if (*ppgno != 0) return 0;

  // Allocate the page by extending the pager
  ret = tdbPagerAllocNewPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  ASSERT(*ppgno != 0);

H
Hongze Cheng 已提交
735 736 737
  return 0;
}

H
Hongze Cheng 已提交
738 739 740 741 742 743 744 745
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage) {
  int   ret;
  int   lcode;
  int   nLoops;
  i64   nRead;
  SPgno pgno;
  int   init = 0;
H
Hongze Cheng 已提交
746

H
Hongze Cheng 已提交
747 748
  lcode = TDB_TRY_LOCK_PAGE(pPage);
  if (lcode == P_LOCK_SUCC) {
H
Hongze Cheng 已提交
749 750 751 752 753
    if (TDB_PAGE_INITIALIZED(pPage)) {
      TDB_UNLOCK_PAGE(pPage);
      return 0;
    }

H
Hongze Cheng 已提交
754 755
    pgno = TDB_PAGE_PGNO(pPage);

756
    tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
H
Hongze Cheng 已提交
757 758 759
    if (loadPage && pgno <= pPager->dbOrigSize) {
      init = 1;

H
Hongze Cheng 已提交
760
      nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
S
Shengliang Guan 已提交
761
      tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
H
Hongze Cheng 已提交
762
      if (nRead < pPage->pageSize) {
H
Hongze Cheng 已提交
763 764 765
        ASSERT(0);
        return -1;
      }
H
Hongze Cheng 已提交
766 767
    } else {
      init = 0;
H
Hongze Cheng 已提交
768 769
    }

H
Hongze Cheng 已提交
770
    ret = (*initPage)(pPage, arg, init);
H
Hongze Cheng 已提交
771
    if (ret < 0) {
H
Hongze Cheng 已提交
772
      ASSERT(0);
H
Hongze Cheng 已提交
773 774 775 776 777 778 779
      TDB_UNLOCK_PAGE(pPage);
      return -1;
    }

    pPage->pPager = pPager;

    TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
780
  } else if (lcode == P_LOCK_BUSY) {
H
Hongze Cheng 已提交
781 782 783 784 785 786 787 788 789
    nLoops = 0;
    for (;;) {
      if (TDB_PAGE_INITIALIZED(pPage)) break;
      nLoops++;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
    }
H
Hongze Cheng 已提交
790
  } else {
H
Hongze Cheng 已提交
791
    ASSERT(0);
H
Hongze Cheng 已提交
792
    return -1;
H
Hongze Cheng 已提交
793 794
  }

H
Hongze Cheng 已提交
795 796 797 798 799 800 801 802 803 804
  return 0;
}

// ---------------------------- Journal manipulation
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
  int   ret;
  SPgno pgno;

  pgno = TDB_PAGE_PGNO(pPage);

H
Hongze Cheng 已提交
805
  ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
H
Hongze Cheng 已提交
806
  if (ret < 0) {
807 808
    tdbError("failed to write pgno due to %s. file:%s, pgno:%u", strerror(errno), pPager->jFileName, pgno);
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
809 810 811
    return -1;
  }

H
Hongze Cheng 已提交
812
  ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
813
  if (ret < 0) {
814 815
    tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName,
             (long)pPage->pageSize);
816
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
817 818 819 820 821
    return -1;
  }

  return 0;
}
822 823 824 825 826 827 828 829
/*
struct TdFile {
  TdThreadRwlock rwlock;
  int            refId;
  int            fd;
  FILE          *fp;
} TdFile;
*/
H
Hongze Cheng 已提交
830 831 832 833
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

834
  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
H
Hongze Cheng 已提交
835
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
836
    tdbError("failed to lseek due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
837
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
838 839 840
    return -1;
  }

H
Hongze Cheng 已提交
841
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
842
  if (ret < 0) {
M
Minglei Jin 已提交
843
    tdbError("failed to write page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->dbFileName,
M
Minglei Jin 已提交
844
             pPage->pageSize);
845
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
846 847 848
    return -1;
  }

849
  // pwrite(pPager->fd->fd, pPage->pData, pPage->pageSize, offset);
H
refact  
Hongze Cheng 已提交
850
  return 0;
851 852 853
}

int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
854
  int   ret = 0;
855
  SPgno journalSize = 0;
856
  u8   *pageBuf = NULL;
857 858

  tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
859
  if (jfd == NULL) {
860 861 862 863 864 865 866 867 868 869 870 871 872 873 874
    return 0;
  }

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

  pageBuf = tdbOsCalloc(1, pPager->pageSize);
  if (pageBuf == NULL) {
    return -1;
  }

  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
    // read pgno & the page from journal
875
    SPgno pgno;
876 877 878

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
M
Minglei Jin 已提交
879
      tdbOsFree(pageBuf);
880 881 882 883 884
      return -1;
    }

    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
M
Minglei Jin 已提交
885
      tdbOsFree(pageBuf);
886 887 888
      return -1;
    }

M
Minglei Jin 已提交
889 890
    i64 offset = pPager->pageSize * (pgno - 1);
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
891
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
892
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
893
      tdbOsFree(pageBuf);
M
Minglei Jin 已提交
894 895 896 897 898
      return -1;
    }

    ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
    if (ret < 0) {
M
Minglei Jin 已提交
899 900
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
               pPager->pageSize);
901
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
902
      tdbOsFree(pageBuf);
M
Minglei Jin 已提交
903 904
      return -1;
    }
905 906
  }

907 908 909
  if (tdbOsFSync(pPager->fd) < 0) {
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
M
Minglei Jin 已提交
910
    tdbOsFree(pageBuf);
911 912
    return -1;
  }
913 914 915

  tdbOsFree(pageBuf);

916 917 918 919 920 921 922 923 924 925 926
  if (tdbOsClose(jfd) < 0) {
    tdbError("failed to close jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
    tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
927 928 929

  return 0;
}
930 931 932 933 934 935 936 937 938 939

int tdbPagerRollback(SPager *pPager) {
  if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
    tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}