tdbPager.c 8.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16 17
#include "tdbInt.h"

H
Hongze Cheng 已提交
18
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21 22 23 24
  u8    hdrString[16];
  u16   pageSize;
  SPgno freePage;
  u32   nFreePages;
  u8    reserved[102];
H
Hongze Cheng 已提交
25
} SFileHdr;
wafwerar's avatar
wafwerar 已提交
26
#pragma pack(pop)
H
Hongze Cheng 已提交
27

H
Hongze Cheng 已提交
28 29
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");

H
Hongze Cheng 已提交
30 31
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)

H
Hongze Cheng 已提交
32 33
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage);
H
Hongze Cheng 已提交
34 35
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
36

H
refact  
Hongze Cheng 已提交
37
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
H
more  
Hongze Cheng 已提交
38
  uint8_t *pPtr;
H
Hongze Cheng 已提交
39
  SPager  *pPager;
H
more  
Hongze Cheng 已提交
40 41
  int      fsize;
  int      zsize;
H
Hongze Cheng 已提交
42
  int      ret;
H
more  
Hongze Cheng 已提交
43

H
refact  
Hongze Cheng 已提交
44
  *ppPager = NULL;
H
more  
Hongze Cheng 已提交
45 46

  fsize = strlen(fileName);
H
refact  
Hongze Cheng 已提交
47
  zsize = sizeof(*pPager)  /* SPager */
H
more  
Hongze Cheng 已提交
48 49
          + fsize + 1      /* dbFileName */
          + fsize + 8 + 1; /* jFileName */
H
Hongze Cheng 已提交
50
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
H
more  
Hongze Cheng 已提交
51 52 53 54
  if (pPtr == NULL) {
    return -1;
  }

H
refact  
Hongze Cheng 已提交
55 56 57 58 59 60
  pPager = (SPager *)pPtr;
  pPtr += sizeof(*pPager);
  // pPager->dbFileName
  pPager->dbFileName = (char *)pPtr;
  memcpy(pPager->dbFileName, fileName, fsize);
  pPager->dbFileName[fsize] = '\0';
H
more  
Hongze Cheng 已提交
61
  pPtr += fsize + 1;
H
refact  
Hongze Cheng 已提交
62 63 64 65 66 67 68 69
  // pPager->jFileName
  pPager->jFileName = (char *)pPtr;
  memcpy(pPager->jFileName, fileName, fsize);
  memcpy(pPager->jFileName + fsize, "-journal", 8);
  pPager->jFileName[fsize + 8] = '\0';
  // pPager->pCache
  pPager->pCache = pCache;

H
Hongze Cheng 已提交
70
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
71
  if (pPager->fd < 0) {
H
more  
Hongze Cheng 已提交
72 73 74
    return -1;
  }

H
refact  
Hongze Cheng 已提交
75
  ret = tdbGnrtFileID(pPager->dbFileName, pPager->fid, false);
H
Hongze Cheng 已提交
76 77 78 79
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
80
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
81
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
H
Hongze Cheng 已提交
82 83
  // pPager->dbOrigSize
  ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
H
Hongze Cheng 已提交
84
  pPager->dbFileSize = pPager->dbOrigSize;
H
more  
Hongze Cheng 已提交
85

H
refact  
Hongze Cheng 已提交
86
  *ppPager = pPager;
H
more  
Hongze Cheng 已提交
87 88 89
  return 0;
}

H
refact  
Hongze Cheng 已提交
90
int tdbPagerClose(SPager *pPager) {
H
more  
Hongze Cheng 已提交
91 92 93 94
  // TODO
  return 0;
}

H
refact  
Hongze Cheng 已提交
95
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
H
Hongze Cheng 已提交
96 97 98 99
  SPgno  pgno;
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
100 101 102
  if (pPager->dbOrigSize > 0) {
    pgno = 1;
  } else {
H
Hongze Cheng 已提交
103 104 105
    pgno = 0;
  }

H
Hongze Cheng 已提交
106 107 108 109 110 111 112 113 114 115
  {
      // TODO: try to search the main DB to get the page number
      // pgno = 0;
  }

      // if (pgno == 0 && toCreate) {
      //   ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
      //   if (ret < 0) {
      //     return -1;
      //   }
H
Hongze Cheng 已提交
116

H
Hongze Cheng 已提交
117
      //   // TODO: Need to zero the page
H
refact  
Hongze Cheng 已提交
118

H
Hongze Cheng 已提交
119 120 121 122 123
      //   ret = tdbPagerWrite(pPager, pPage);
      //   if (ret < 0) {
      //     return -1;
      //   }
      // }
H
Hongze Cheng 已提交
124

H
Hongze Cheng 已提交
125
      *ppgno = pgno;
H
Hongze Cheng 已提交
126 127 128
  return 0;
}

H
refact  
Hongze Cheng 已提交
129
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
H
Hongze Cheng 已提交
130 131
  int     ret;
  SPage **ppPage;
H
more  
Hongze Cheng 已提交
132

H
Hongze Cheng 已提交
133 134
  ASSERT(pPager->inTran);
#if 0
H
refact  
Hongze Cheng 已提交
135 136
  if (pPager->inTran == 0) {
    ret = tdbPagerBegin(pPager);
H
more  
Hongze Cheng 已提交
137 138 139 140
    if (ret < 0) {
      return -1;
    }
  }
H
Hongze Cheng 已提交
141
#endif
H
more  
Hongze Cheng 已提交
142

H
Hongze Cheng 已提交
143
  if (pPage->isDirty) return 0;
H
more  
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145 146 147
  // ref page one more time so the page will not be release
  TDB_REF_PAGE(pPage);

H
Hongze Cheng 已提交
148 149 150
  // Set page as dirty
  pPage->isDirty = 1;

H
Hongze Cheng 已提交
151
  // Add page to dirty list(TODO: NOT use O(n^2) algorithm)
H
Hongze Cheng 已提交
152 153 154 155 156 157
  for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
       ppPage = &((*ppPage)->pDirtyNext)) {
  }
  ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
  pPage->pDirtyNext = *ppPage;
  *ppPage = pPage;
H
Hongze Cheng 已提交
158

H
Hongze Cheng 已提交
159
  // Write page to journal if neccessary
H
Hongze Cheng 已提交
160 161 162 163 164
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
    ret = tdbPagerWritePageToJournal(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
H
Hongze Cheng 已提交
165
    }
H
more  
Hongze Cheng 已提交
166
  }
H
Hongze Cheng 已提交
167

H
Hongze Cheng 已提交
168 169 170
  return 0;
}

H
Hongze Cheng 已提交
171
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
H
refact  
Hongze Cheng 已提交
172
  if (pPager->inTran) {
H
more  
Hongze Cheng 已提交
173 174
    return 0;
  }
H
Hongze Cheng 已提交
175 176

  // Open the journal
H
Hongze Cheng 已提交
177
  pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
178
  if (pPager->jfd < 0) {
H
Hongze Cheng 已提交
179 180 181 182 183
    return -1;
  }

  // TODO: write the size of the file

H
refact  
Hongze Cheng 已提交
184
  pPager->inTran = 1;
H
Hongze Cheng 已提交
185

H
more  
Hongze Cheng 已提交
186 187 188
  return 0;
}

H
Hongze Cheng 已提交
189
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
H
Hongze Cheng 已提交
190 191 192
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
193 194 195 196 197 198
  // sync the journal file
  ret = tdbOsFSync(pPager->jfd);
  if (ret < 0) {
    // TODO
    ASSERT(0);
    return 0;
H
Hongze Cheng 已提交
199 200
  }

H
Hongze Cheng 已提交
201 202
  // loop to write the dirty pages to file
  for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) {
H
Hongze Cheng 已提交
203
    // TODO: update the page footer
H
Hongze Cheng 已提交
204 205 206 207 208 209 210
    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }
  }

H
Hongze Cheng 已提交
211
  // release the page
H
Hongze Cheng 已提交
212
  for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
H
Hongze Cheng 已提交
213 214 215 216 217
    pPager->pDirty = pPage->pDirtyNext;
    pPage->pDirtyNext = NULL;

    pPage->isDirty = 0;

H
Hongze Cheng 已提交
218
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
H
Hongze Cheng 已提交
219
  }
H
Hongze Cheng 已提交
220 221

  // sync the db file
H
Hongze Cheng 已提交
222
  tdbOsFSync(pPager->fd);
H
Hongze Cheng 已提交
223

H
Hongze Cheng 已提交
224
  // remote the journal file
H
Hongze Cheng 已提交
225
  tdbOsClose(pPager->jfd);
H
Hongze Cheng 已提交
226
  tdbOsRemove(pPager->jFileName);
H
Hongze Cheng 已提交
227
  pPager->dbOrigSize = pPager->dbFileSize;
H
Hongze Cheng 已提交
228
  pPager->inTran = 0;
H
Hongze Cheng 已提交
229

H
more  
Hongze Cheng 已提交
230 231 232
  return 0;
}

H
Hongze Cheng 已提交
233
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
H
Hongze Cheng 已提交
234
                      TXN *pTxn) {
H
more  
Hongze Cheng 已提交
235 236 237
  SPage *pPage;
  SPgid  pgid;
  int    ret;
H
Hongze Cheng 已提交
238 239
  SPgno  pgno;
  u8     loadPage;
H
more  
Hongze Cheng 已提交
240

H
Hongze Cheng 已提交
241 242
  pgno = *ppgno;
  loadPage = 1;
H
more  
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244 245 246 247
  // alloc new page
  if (pgno == 0) {
    loadPage = 0;
    ret = tdbPagerAllocPage(pPager, &pgno);
H
Hongze Cheng 已提交
248
    if (ret < 0) {
H
Hongze Cheng 已提交
249
      ASSERT(0);
H
Hongze Cheng 已提交
250
      return -1;
H
more  
Hongze Cheng 已提交
251
    }
H
more  
Hongze Cheng 已提交
252 253
  }

H
Hongze Cheng 已提交
254
  ASSERT(pgno > 0);
H
Hongze Cheng 已提交
255

H
Hongze Cheng 已提交
256
  // fetch a page container
H
more  
Hongze Cheng 已提交
257
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
H
Hongze Cheng 已提交
258
  pgid.pgno = pgno;
H
Hongze Cheng 已提交
259
  pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
H
more  
Hongze Cheng 已提交
260 261 262 263
  if (pPage == NULL) {
    return -1;
  }

H
Hongze Cheng 已提交
264 265 266 267 268 269
  // init page if need
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
    if (ret < 0) {
      return -1;
    }
H
Hongze Cheng 已提交
270
  }
H
more  
Hongze Cheng 已提交
271

H
Hongze Cheng 已提交
272 273
  ASSERT(TDB_PAGE_INITIALIZED(pPage));
  ASSERT(pPage->pPager == pPager);
H
more  
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275
  *ppgno = pgno;
H
more  
Hongze Cheng 已提交
276 277 278 279
  *ppPage = pPage;
  return 0;
}

H
Hongze Cheng 已提交
280
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); }
H
Hongze Cheng 已提交
281

H
more  
Hongze Cheng 已提交
282 283 284 285 286 287 288 289 290 291
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
  // TODO: Allocate a page from the free list
  return 0;
}

static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
  *ppgno = ++pPager->dbFileSize;
  return 0;
}

H
Hongze Cheng 已提交
292
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
H
more  
Hongze Cheng 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
  int ret;

  *ppgno = 0;

  // Try to allocate from the free list of the pager
  ret = tdbPagerAllocFreePage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  if (*ppgno != 0) return 0;

  // Allocate the page by extending the pager
  ret = tdbPagerAllocNewPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  ASSERT(*ppgno != 0);

H
Hongze Cheng 已提交
313 314 315
  return 0;
}

H
Hongze Cheng 已提交
316 317 318 319 320 321 322 323
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
                            u8 loadPage) {
  int   ret;
  int   lcode;
  int   nLoops;
  i64   nRead;
  SPgno pgno;
  int   init = 0;
H
Hongze Cheng 已提交
324

H
Hongze Cheng 已提交
325 326
  lcode = TDB_TRY_LOCK_PAGE(pPage);
  if (lcode == P_LOCK_SUCC) {
H
Hongze Cheng 已提交
327 328 329 330 331
    if (TDB_PAGE_INITIALIZED(pPage)) {
      TDB_UNLOCK_PAGE(pPage);
      return 0;
    }

H
Hongze Cheng 已提交
332 333 334 335 336
    pgno = TDB_PAGE_PGNO(pPage);

    if (loadPage && pgno <= pPager->dbOrigSize) {
      init = 1;

H
Hongze Cheng 已提交
337
      nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
H
Hongze Cheng 已提交
338
      if (nRead < pPage->pageSize) {
H
Hongze Cheng 已提交
339 340 341
        ASSERT(0);
        return -1;
      }
H
Hongze Cheng 已提交
342 343
    } else {
      init = 0;
H
Hongze Cheng 已提交
344 345
    }

H
Hongze Cheng 已提交
346
    ret = (*initPage)(pPage, arg, init);
H
Hongze Cheng 已提交
347 348 349 350 351 352 353 354
    if (ret < 0) {
      TDB_UNLOCK_PAGE(pPage);
      return -1;
    }

    pPage->pPager = pPager;

    TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
355
  } else if (lcode == P_LOCK_BUSY) {
H
Hongze Cheng 已提交
356 357 358 359 360 361 362 363 364
    nLoops = 0;
    for (;;) {
      if (TDB_PAGE_INITIALIZED(pPage)) break;
      nLoops++;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
    }
H
Hongze Cheng 已提交
365 366
  } else {
    return -1;
H
Hongze Cheng 已提交
367 368
  }

H
Hongze Cheng 已提交
369 370 371 372 373 374 375 376 377 378
  return 0;
}

// ---------------------------- Journal manipulation
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
  int   ret;
  SPgno pgno;

  pgno = TDB_PAGE_PGNO(pPage);

H
Hongze Cheng 已提交
379
  ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
H
Hongze Cheng 已提交
380 381 382 383
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
384
  ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
385 386 387 388 389 390 391 392 393 394 395
  if (ret < 0) {
    return -1;
  }

  return 0;
}

static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

H
Hongze Cheng 已提交
396
  offset = pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
H
Hongze Cheng 已提交
397
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
H
Hongze Cheng 已提交
398 399 400 401
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
402
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
403 404 405 406 407
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
408 409
  return 0;
}