tdbPager.c 13.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16 17
#include "tdbInt.h"

H
Hongze Cheng 已提交
18
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21 22 23 24
  u8    hdrString[16];
  u16   pageSize;
  SPgno freePage;
  u32   nFreePages;
  u8    reserved[102];
H
Hongze Cheng 已提交
25
} SFileHdr;
wafwerar's avatar
wafwerar 已提交
26
#pragma pack(pop)
H
Hongze Cheng 已提交
27

H
Hongze Cheng 已提交
28 29
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");

H
Hongze Cheng 已提交
30 31
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)

H
Hongze Cheng 已提交
32 33
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage);
H
Hongze Cheng 已提交
34 35
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
static FORCE_INLINE int32_t pageCmpFn(const void *lhs, const void *rhs) {
  SPage *pPageL = (SPage *)(((uint8_t *)lhs) - sizeof(SRBTreeNode));
  SPage *pPageR = (SPage *)(((uint8_t *)rhs) - sizeof(SRBTreeNode));

  SPgno pgnoL = TDB_PAGE_PGNO(pPageL);
  SPgno pgnoR = TDB_PAGE_PGNO(pPageR);

  if (pgnoL < pgnoR) {
    return -1;
  } else if (pgnoL > pgnoR) {
    return 1;
  } else {
    return 0;
  }
}

H
refact  
Hongze Cheng 已提交
53
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
H
more  
Hongze Cheng 已提交
54
  uint8_t *pPtr;
H
Hongze Cheng 已提交
55
  SPager  *pPager;
H
more  
Hongze Cheng 已提交
56 57
  int      fsize;
  int      zsize;
H
Hongze Cheng 已提交
58
  int      ret;
H
more  
Hongze Cheng 已提交
59

H
refact  
Hongze Cheng 已提交
60
  *ppPager = NULL;
H
more  
Hongze Cheng 已提交
61 62

  fsize = strlen(fileName);
H
refact  
Hongze Cheng 已提交
63
  zsize = sizeof(*pPager)  /* SPager */
H
more  
Hongze Cheng 已提交
64 65
          + fsize + 1      /* dbFileName */
          + fsize + 8 + 1; /* jFileName */
H
Hongze Cheng 已提交
66
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
H
more  
Hongze Cheng 已提交
67 68 69 70
  if (pPtr == NULL) {
    return -1;
  }

H
refact  
Hongze Cheng 已提交
71 72 73 74 75 76
  pPager = (SPager *)pPtr;
  pPtr += sizeof(*pPager);
  // pPager->dbFileName
  pPager->dbFileName = (char *)pPtr;
  memcpy(pPager->dbFileName, fileName, fsize);
  pPager->dbFileName[fsize] = '\0';
H
more  
Hongze Cheng 已提交
77
  pPtr += fsize + 1;
H
refact  
Hongze Cheng 已提交
78 79 80 81 82 83 84 85
  // pPager->jFileName
  pPager->jFileName = (char *)pPtr;
  memcpy(pPager->jFileName, fileName, fsize);
  memcpy(pPager->jFileName + fsize, "-journal", 8);
  pPager->jFileName[fsize + 8] = '\0';
  // pPager->pCache
  pPager->pCache = pCache;

H
Hongze Cheng 已提交
86
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
87
  if (pPager->fd < 0) {
H
more  
Hongze Cheng 已提交
88 89 90
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
91
  ret = tdbGnrtFileID(pPager->fd, pPager->fid, false);
H
Hongze Cheng 已提交
92 93 94 95
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
96
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
97
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
H
Hongze Cheng 已提交
98 99
  // pPager->dbOrigSize
  ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
H
Hongze Cheng 已提交
100
  pPager->dbFileSize = pPager->dbOrigSize;
H
more  
Hongze Cheng 已提交
101

102 103
  tRBTreeCreate(&pPager->rbt, pageCmpFn);

H
refact  
Hongze Cheng 已提交
104
  *ppPager = pPager;
H
more  
Hongze Cheng 已提交
105 106 107
  return 0;
}

H
refact  
Hongze Cheng 已提交
108
int tdbPagerClose(SPager *pPager) {
H
Hongze Cheng 已提交
109 110 111 112 113 114 115
  if (pPager) {
    if (pPager->inTran) {
      tdbOsClose(pPager->jfd);
    }
    tdbOsClose(pPager->fd);
    tdbOsFree(pPager);
  }
H
more  
Hongze Cheng 已提交
116 117 118
  return 0;
}

119
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) {
H
Hongze Cheng 已提交
120 121 122 123
  SPgno  pgno;
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
124 125 126
  if (pPager->dbOrigSize > 0) {
    pgno = 1;
  } else {
H
Hongze Cheng 已提交
127 128 129
    pgno = 0;
  }

H
Hongze Cheng 已提交
130
  {
131 132
    // TODO: try to search the main DB to get the page number
    // pgno = 0;
H
Hongze Cheng 已提交
133 134
  }

135 136 137 138
  if (pgno == 0 && toCreate) {
    // allocate a new child page
    TXN txn;
    tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
H
Hongze Cheng 已提交
139

140
    pPager->inTran = 1;
H
refact  
Hongze Cheng 已提交
141

142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
    SBtreeInitPageArg zArg;
    zArg.flags = 0x1 | 0x2;  // root leaf node;
    zArg.pBt = pBt;
    ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
    if (ret < 0) {
      return -1;
    }

    //    ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
    // if (ret < 0) {
    //  return -1;
    //}

    // TODO: Need to zero the page

    ret = tdbPagerWrite(pPager, pPage);
    if (ret < 0) {
      return -1;
    }
H
Hongze Cheng 已提交
161

162 163 164 165
    tdbTxnClose(&txn);
  }

  *ppgno = pgno;
H
Hongze Cheng 已提交
166 167 168
  return 0;
}

H
refact  
Hongze Cheng 已提交
169
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
H
Hongze Cheng 已提交
170 171
  int     ret;
  SPage **ppPage;
H
more  
Hongze Cheng 已提交
172

H
Hongze Cheng 已提交
173 174
  ASSERT(pPager->inTran);
#if 0
H
refact  
Hongze Cheng 已提交
175 176
  if (pPager->inTran == 0) {
    ret = tdbPagerBegin(pPager);
H
more  
Hongze Cheng 已提交
177 178 179 180
    if (ret < 0) {
      return -1;
    }
  }
H
Hongze Cheng 已提交
181
#endif
H
more  
Hongze Cheng 已提交
182

H
Hongze Cheng 已提交
183
  if (pPage->isDirty) return 0;
H
more  
Hongze Cheng 已提交
184

H
Hongze Cheng 已提交
185
  // ref page one more time so the page will not be release
H
Hongze Cheng 已提交
186
  tdbRefPage(pPage);
M
Minglei Jin 已提交
187
  tdbDebug("pcache/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
188
  /*
H
Hongze Cheng 已提交
189 190 191
  // Set page as dirty
  pPage->isDirty = 1;

H
Hongze Cheng 已提交
192
  // Add page to dirty list(TODO: NOT use O(n^2) algorithm)
H
Hongze Cheng 已提交
193 194 195
  for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
       ppPage = &((*ppPage)->pDirtyNext)) {
  }
196 197 198 199 200 201 202

  if (*ppPage && TDB_PAGE_PGNO(*ppPage) == TDB_PAGE_PGNO(pPage)) {
    tdbUnrefPage(pPage);

    return 0;
  }

H
Hongze Cheng 已提交
203 204 205
  ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
  pPage->pDirtyNext = *ppPage;
  *ppPage = pPage;
206 207
  */
  tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
H
Hongze Cheng 已提交
208

H
Hongze Cheng 已提交
209
  // Write page to journal if neccessary
H
Hongze Cheng 已提交
210 211 212 213 214
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
    ret = tdbPagerWritePageToJournal(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
H
Hongze Cheng 已提交
215
    }
H
more  
Hongze Cheng 已提交
216
  }
H
Hongze Cheng 已提交
217

H
Hongze Cheng 已提交
218 219 220
  return 0;
}

H
Hongze Cheng 已提交
221
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
H
refact  
Hongze Cheng 已提交
222
  if (pPager->inTran) {
H
more  
Hongze Cheng 已提交
223 224
    return 0;
  }
H
Hongze Cheng 已提交
225 226

  // Open the journal
H
Hongze Cheng 已提交
227
  pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
228
  if (pPager->jfd < 0) {
H
Hongze Cheng 已提交
229 230 231 232 233
    return -1;
  }

  // TODO: write the size of the file

H
refact  
Hongze Cheng 已提交
234
  pPager->inTran = 1;
H
Hongze Cheng 已提交
235

H
more  
Hongze Cheng 已提交
236 237 238
  return 0;
}

H
Hongze Cheng 已提交
239
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
H
Hongze Cheng 已提交
240 241 242
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
243 244 245 246 247 248
  // sync the journal file
  ret = tdbOsFSync(pPager->jfd);
  if (ret < 0) {
    // TODO
    ASSERT(0);
    return 0;
H
Hongze Cheng 已提交
249 250
  }

251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
  SRBTreeNode *pNode = NULL;
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
    pPage = (SPage *)pNode;
    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }

    pPage->isDirty = 0;

    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }

  tRBTreeCreate(&pPager->rbt, pageCmpFn);
  /*
H
Hongze Cheng 已提交
268 269
  // loop to write the dirty pages to file
  for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) {
H
Hongze Cheng 已提交
270
    // TODO: update the page footer
H
Hongze Cheng 已提交
271 272 273 274 275 276 277
    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }
  }

H
Hongze Cheng 已提交
278
  // release the page
H
Hongze Cheng 已提交
279
  for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
H
Hongze Cheng 已提交
280 281 282 283 284
    pPager->pDirty = pPage->pDirtyNext;
    pPage->pDirtyNext = NULL;

    pPage->isDirty = 0;

H
Hongze Cheng 已提交
285
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
H
Hongze Cheng 已提交
286
  }
287 288 289
  */
  tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize);
  pPager->dbOrigSize = pPager->dbFileSize;
H
Hongze Cheng 已提交
290 291

  // sync the db file
H
Hongze Cheng 已提交
292
  tdbOsFSync(pPager->fd);
H
Hongze Cheng 已提交
293

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
  // remove the journal file
  tdbOsClose(pPager->jfd);
  tdbOsRemove(pPager->jFileName);
  pPager->inTran = 0;

  return 0;
}

// recovery dirty pages
int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
  SPage *pPage;
  int    pgIdx;
  SPgno  journalSize = 0;
  int    ret;

  // 0, sync the journal file
  ret = tdbOsFSync(pPager->jfd);
  if (ret < 0) {
    // TODO
    ASSERT(0);
    return 0;
  }

  tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
  if (jfd == NULL) {
    return 0;
  }

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

  // 1, read pages from jounal file
  // 2, write original pages to buffered ones

  /* TODO: reset the buffered pages instead of releasing them
  // loop to reset the dirty pages from file
  for (pgIdx = 0, pPage = pPager->pDirty; pPage != NULL && pgIndex < journalSize; pPage = pPage->pDirtyNext, ++pgIdx) {
    // read pgno & the page from journal
    SPgno pgno;

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
      return -1;
    }

    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
      return -1;
    }
  }
  */
  // 3, release the dirty pages
  for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
    pPager->pDirty = pPage->pDirtyNext;
    pPage->pDirtyNext = NULL;

    pPage->isDirty = 0;

    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  }

  // 4, remove the journal file
H
Hongze Cheng 已提交
358
  tdbOsClose(pPager->jfd);
H
Hongze Cheng 已提交
359
  tdbOsRemove(pPager->jFileName);
H
Hongze Cheng 已提交
360
  pPager->inTran = 0;
H
Hongze Cheng 已提交
361

H
more  
Hongze Cheng 已提交
362 363 364
  return 0;
}

H
Hongze Cheng 已提交
365
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
H
Hongze Cheng 已提交
366
                      TXN *pTxn) {
H
more  
Hongze Cheng 已提交
367 368 369
  SPage *pPage;
  SPgid  pgid;
  int    ret;
H
Hongze Cheng 已提交
370 371
  SPgno  pgno;
  u8     loadPage;
H
more  
Hongze Cheng 已提交
372

H
Hongze Cheng 已提交
373 374
  pgno = *ppgno;
  loadPage = 1;
H
more  
Hongze Cheng 已提交
375

H
Hongze Cheng 已提交
376 377 378 379
  // alloc new page
  if (pgno == 0) {
    loadPage = 0;
    ret = tdbPagerAllocPage(pPager, &pgno);
H
Hongze Cheng 已提交
380
    if (ret < 0) {
H
Hongze Cheng 已提交
381
      ASSERT(0);
H
Hongze Cheng 已提交
382
      return -1;
H
more  
Hongze Cheng 已提交
383
    }
H
more  
Hongze Cheng 已提交
384 385
  }

H
Hongze Cheng 已提交
386
  ASSERT(pgno > 0);
H
Hongze Cheng 已提交
387

H
Hongze Cheng 已提交
388
  // fetch a page container
H
more  
Hongze Cheng 已提交
389
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
H
Hongze Cheng 已提交
390
  pgid.pgno = pgno;
H
Hongze Cheng 已提交
391
  pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
H
more  
Hongze Cheng 已提交
392
  if (pPage == NULL) {
H
Hongze Cheng 已提交
393
    ASSERT(0);
H
more  
Hongze Cheng 已提交
394 395 396
    return -1;
  }

397
  tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
H
Hongze Cheng 已提交
398 399 400 401
  // init page if need
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
    if (ret < 0) {
H
Hongze Cheng 已提交
402
      ASSERT(0);
H
Hongze Cheng 已提交
403 404
      return -1;
    }
H
Hongze Cheng 已提交
405
  }
H
more  
Hongze Cheng 已提交
406

H
Hongze Cheng 已提交
407 408 409
  // printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);

H
Hongze Cheng 已提交
410 411
  ASSERT(TDB_PAGE_INITIALIZED(pPage));
  ASSERT(pPage->pPager == pPager);
H
more  
Hongze Cheng 已提交
412

H
Hongze Cheng 已提交
413
  *ppgno = pgno;
H
more  
Hongze Cheng 已提交
414 415 416 417
  *ppPage = pPage;
  return 0;
}

H
Hongze Cheng 已提交
418 419 420 421 422
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
  tdbPCacheRelease(pPager->pCache, pPage, pTxn);
  // printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage);
}
H
Hongze Cheng 已提交
423

H
more  
Hongze Cheng 已提交
424 425 426 427 428 429 430 431 432 433
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
  // TODO: Allocate a page from the free list
  return 0;
}

static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
  *ppgno = ++pPager->dbFileSize;
  return 0;
}

H
Hongze Cheng 已提交
434
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
H
more  
Hongze Cheng 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
  int ret;

  *ppgno = 0;

  // Try to allocate from the free list of the pager
  ret = tdbPagerAllocFreePage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  if (*ppgno != 0) return 0;

  // Allocate the page by extending the pager
  ret = tdbPagerAllocNewPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  ASSERT(*ppgno != 0);

H
Hongze Cheng 已提交
455 456 457
  return 0;
}

H
Hongze Cheng 已提交
458 459 460 461 462 463 464 465
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage) {
  int   ret;
  int   lcode;
  int   nLoops;
  i64   nRead;
  SPgno pgno;
  int   init = 0;
H
Hongze Cheng 已提交
466

H
Hongze Cheng 已提交
467 468
  lcode = TDB_TRY_LOCK_PAGE(pPage);
  if (lcode == P_LOCK_SUCC) {
H
Hongze Cheng 已提交
469 470 471 472 473
    if (TDB_PAGE_INITIALIZED(pPage)) {
      TDB_UNLOCK_PAGE(pPage);
      return 0;
    }

H
Hongze Cheng 已提交
474 475
    pgno = TDB_PAGE_PGNO(pPage);

476
    tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
H
Hongze Cheng 已提交
477 478 479
    if (loadPage && pgno <= pPager->dbOrigSize) {
      init = 1;

H
Hongze Cheng 已提交
480
      nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
S
Shengliang Guan 已提交
481
      tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
H
Hongze Cheng 已提交
482
      if (nRead < pPage->pageSize) {
H
Hongze Cheng 已提交
483 484 485
        ASSERT(0);
        return -1;
      }
H
Hongze Cheng 已提交
486 487
    } else {
      init = 0;
H
Hongze Cheng 已提交
488 489
    }

H
Hongze Cheng 已提交
490
    ret = (*initPage)(pPage, arg, init);
H
Hongze Cheng 已提交
491
    if (ret < 0) {
H
Hongze Cheng 已提交
492
      ASSERT(0);
H
Hongze Cheng 已提交
493 494 495 496 497 498 499
      TDB_UNLOCK_PAGE(pPage);
      return -1;
    }

    pPage->pPager = pPager;

    TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
500
  } else if (lcode == P_LOCK_BUSY) {
H
Hongze Cheng 已提交
501 502 503 504 505 506 507 508 509
    nLoops = 0;
    for (;;) {
      if (TDB_PAGE_INITIALIZED(pPage)) break;
      nLoops++;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
    }
H
Hongze Cheng 已提交
510
  } else {
H
Hongze Cheng 已提交
511
    ASSERT(0);
H
Hongze Cheng 已提交
512
    return -1;
H
Hongze Cheng 已提交
513 514
  }

H
Hongze Cheng 已提交
515 516 517 518 519 520 521 522 523 524
  return 0;
}

// ---------------------------- Journal manipulation
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
  int   ret;
  SPgno pgno;

  pgno = TDB_PAGE_PGNO(pPage);

H
Hongze Cheng 已提交
525
  ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
H
Hongze Cheng 已提交
526 527 528 529
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
530
  ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
531 532 533 534 535 536
  if (ret < 0) {
    return -1;
  }

  return 0;
}
537 538 539 540 541 542 543 544
/*
struct TdFile {
  TdThreadRwlock rwlock;
  int            refId;
  int            fd;
  FILE          *fp;
} TdFile;
*/
H
Hongze Cheng 已提交
545 546 547 548
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

549
  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
H
Hongze Cheng 已提交
550
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
H
Hongze Cheng 已提交
551 552 553 554
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
555
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
556 557 558 559 560
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

561
  // pwrite(pPager->fd->fd, pPage->pData, pPage->pageSize, offset);
H
refact  
Hongze Cheng 已提交
562
  return 0;
563 564 565
}

int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
566
  int   ret = 0;
567
  SPgno journalSize = 0;
568
  u8   *pageBuf = NULL;
569 570

  tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
571
  if (jfd == NULL) {
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
    return 0;
  }

  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
  if (ret < 0) {
    return -1;
  }

  pageBuf = tdbOsCalloc(1, pPager->pageSize);
  if (pageBuf == NULL) {
    return -1;
  }

  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
    // read pgno & the page from journal
587
    SPgno pgno;
588 589 590 591 592 593 594 595 596 597 598

    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
    if (ret < 0) {
      return -1;
    }

    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
    if (ret < 0) {
      return -1;
    }

M
Minglei Jin 已提交
599 600 601 602 603 604 605 606 607 608 609
    i64 offset = pPager->pageSize * (pgno - 1);
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
      ASSERT(0);
      return -1;
    }

    ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }
610 611 612 613 614 615 616 617 618 619 620
  }

  tdbOsFSync(pPager->fd);

  tdbOsFree(pageBuf);

  tdbOsClose(jfd);
  tdbOsRemove(pPager->jFileName);

  return 0;
}