tdbPager.c 8.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16 17
#include "tdbInt.h"

H
refact  
Hongze Cheng 已提交
18
struct SPager {
H
Hongze Cheng 已提交
19 20
  char    *dbFileName;
  char    *jFileName;
H
Hongze Cheng 已提交
21
  int      pageSize;
H
more  
Hongze Cheng 已提交
22
  uint8_t  fid[TDB_FILE_ID_LEN];
H
Hongze Cheng 已提交
23 24
  tdb_fd_t fd;
  tdb_fd_t jfd;
H
more  
Hongze Cheng 已提交
25 26 27
  SPCache *pCache;
  SPgno    dbFileSize;
  SPgno    dbOrigSize;
H
Hongze Cheng 已提交
28
  SPage   *pDirty;
H
more  
Hongze Cheng 已提交
29
  u8       inTran;
H
more  
Hongze Cheng 已提交
30 31
};

H
Hongze Cheng 已提交
32 33 34 35 36 37
typedef struct __attribute__((__packed__)) {
  u8    hdrString[16];
  u16   pageSize;
  SPgno freePage;
  u32   nFreePages;
  u8    reserved[102];
H
Hongze Cheng 已提交
38 39
} SFileHdr;

H
Hongze Cheng 已提交
40 41
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");

H
Hongze Cheng 已提交
42 43
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)

H
refact  
Hongze Cheng 已提交
44
static int tdbPagerReadPage(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
45
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
H
Hongze Cheng 已提交
46
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg);
H
Hongze Cheng 已提交
47 48
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
49

H
refact  
Hongze Cheng 已提交
50
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
H
more  
Hongze Cheng 已提交
51
  uint8_t *pPtr;
H
Hongze Cheng 已提交
52
  SPager  *pPager;
H
more  
Hongze Cheng 已提交
53 54
  int      fsize;
  int      zsize;
H
Hongze Cheng 已提交
55
  int      ret;
H
more  
Hongze Cheng 已提交
56

H
refact  
Hongze Cheng 已提交
57
  *ppPager = NULL;
H
more  
Hongze Cheng 已提交
58 59

  fsize = strlen(fileName);
H
refact  
Hongze Cheng 已提交
60
  zsize = sizeof(*pPager)  /* SPager */
H
more  
Hongze Cheng 已提交
61 62
          + fsize + 1      /* dbFileName */
          + fsize + 8 + 1; /* jFileName */
H
Hongze Cheng 已提交
63
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
H
more  
Hongze Cheng 已提交
64 65 66 67
  if (pPtr == NULL) {
    return -1;
  }

H
refact  
Hongze Cheng 已提交
68 69 70 71 72 73
  pPager = (SPager *)pPtr;
  pPtr += sizeof(*pPager);
  // pPager->dbFileName
  pPager->dbFileName = (char *)pPtr;
  memcpy(pPager->dbFileName, fileName, fsize);
  pPager->dbFileName[fsize] = '\0';
H
more  
Hongze Cheng 已提交
74
  pPtr += fsize + 1;
H
refact  
Hongze Cheng 已提交
75 76 77 78 79 80 81 82
  // pPager->jFileName
  pPager->jFileName = (char *)pPtr;
  memcpy(pPager->jFileName, fileName, fsize);
  memcpy(pPager->jFileName + fsize, "-journal", 8);
  pPager->jFileName[fsize + 8] = '\0';
  // pPager->pCache
  pPager->pCache = pCache;

H
Hongze Cheng 已提交
83
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
84
  if (pPager->fd < 0) {
H
more  
Hongze Cheng 已提交
85 86 87
    return -1;
  }

H
refact  
Hongze Cheng 已提交
88
  ret = tdbGnrtFileID(pPager->dbFileName, pPager->fid, false);
H
Hongze Cheng 已提交
89 90 91 92
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
93
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
94
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
H
more  
Hongze Cheng 已提交
95

H
refact  
Hongze Cheng 已提交
96
  *ppPager = pPager;
H
more  
Hongze Cheng 已提交
97 98 99
  return 0;
}

H
refact  
Hongze Cheng 已提交
100
int tdbPagerClose(SPager *pPager) {
H
more  
Hongze Cheng 已提交
101 102 103 104
  // TODO
  return 0;
}

H
refact  
Hongze Cheng 已提交
105
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
H
Hongze Cheng 已提交
106 107 108 109 110 111 112 113 114
  SPgno  pgno;
  SPage *pPage;
  int    ret;

  {
    // TODO: try to search the main DB to get the page number
    pgno = 0;
  }

H
more  
Hongze Cheng 已提交
115 116 117 118 119
  // if (pgno == 0 && toCreate) {
  //   ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
  //   if (ret < 0) {
  //     return -1;
  //   }
H
Hongze Cheng 已提交
120

H
more  
Hongze Cheng 已提交
121
  //   // TODO: Need to zero the page
H
refact  
Hongze Cheng 已提交
122

H
more  
Hongze Cheng 已提交
123 124 125 126 127
  //   ret = tdbPagerWrite(pPager, pPage);
  //   if (ret < 0) {
  //     return -1;
  //   }
  // }
H
Hongze Cheng 已提交
128 129 130 131 132

  *ppgno = pgno;
  return 0;
}

H
refact  
Hongze Cheng 已提交
133
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
H
more  
Hongze Cheng 已提交
134 135
  int ret;

H
refact  
Hongze Cheng 已提交
136 137
  if (pPager->inTran == 0) {
    ret = tdbPagerBegin(pPager);
H
more  
Hongze Cheng 已提交
138 139 140 141 142
    if (ret < 0) {
      return -1;
    }
  }

H
Hongze Cheng 已提交
143
  if (pPage->isDirty) return 0;
H
more  
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158
  // Set page as dirty
  pPage->isDirty = 1;

  // Add page to dirty list
  // TODO: sort the list according to the page number
  pPage->pDirtyNext = pPager->pDirty;
  pPager->pDirty = pPage;

  // Write page to journal
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
    ret = tdbPagerWritePageToJournal(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
H
Hongze Cheng 已提交
159
    }
H
more  
Hongze Cheng 已提交
160
  }
H
Hongze Cheng 已提交
161

H
Hongze Cheng 已提交
162 163 164
  return 0;
}

H
refact  
Hongze Cheng 已提交
165 166
int tdbPagerBegin(SPager *pPager) {
  if (pPager->inTran) {
H
more  
Hongze Cheng 已提交
167 168
    return 0;
  }
H
Hongze Cheng 已提交
169 170

  // Open the journal
H
Hongze Cheng 已提交
171
  pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
172
  if (pPager->jfd < 0) {
H
Hongze Cheng 已提交
173 174 175 176 177
    return -1;
  }

  // TODO: write the size of the file

H
refact  
Hongze Cheng 已提交
178
  pPager->inTran = 1;
H
Hongze Cheng 已提交
179

H
more  
Hongze Cheng 已提交
180 181 182
  return 0;
}

H
refact  
Hongze Cheng 已提交
183
int tdbPagerCommit(SPager *pPager) {
H
Hongze Cheng 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
  SPage *pPage;
  int    ret;

  // Begin commit
  {
    // TODO: Sync the journal file (Here or when write ?)
  }

  for (;;) {
    pPage = pPager->pDirty;

    if (pPage == NULL) break;

    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }

    pPager->pDirty = pPage->pDirtyNext;
    pPage->pDirtyNext = NULL;

    // TODO: release the page
  }

H
Hongze Cheng 已提交
209
  tdbOsFSync(pPager->fd);
H
Hongze Cheng 已提交
210

H
Hongze Cheng 已提交
211
  tdbOsClose(pPager->jfd);
H
Hongze Cheng 已提交
212
  tdbOsRemove(pPager->jFileName);
H
Hongze Cheng 已提交
213
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
214

H
more  
Hongze Cheng 已提交
215 216 217
  return 0;
}

H
refact  
Hongze Cheng 已提交
218
static int tdbPagerReadPage(SPager *pPager, SPage *pPage) {
H
Hongze Cheng 已提交
219 220 221
  i64 offset;
  int ret;

H
refact  
Hongze Cheng 已提交
222
  ASSERT(memcmp(pPager->fid, pPage->pgid.fileid, TDB_FILE_ID_LEN) == 0);
H
Hongze Cheng 已提交
223

H
refact  
Hongze Cheng 已提交
224
  offset = (pPage->pgid.pgno - 1) * (i64)(pPager->pageSize);
H
Hongze Cheng 已提交
225
  ret = tdbOsPRead(pPager->fd, pPage->pData, pPager->pageSize, offset);
H
Hongze Cheng 已提交
226 227 228 229 230
  if (ret < 0) {
    // TODO: handle error
    return -1;
  }
  return 0;
H
Hongze Cheng 已提交
231 232
}

H
refact  
Hongze Cheng 已提交
233 234
int tdbPagerGetPageSize(SPager *pPager) { return pPager->pageSize; }

H
Hongze Cheng 已提交
235
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) {
H
more  
Hongze Cheng 已提交
236 237 238 239 240 241 242 243 244 245 246 247
  SPage *pPage;
  SPgid  pgid;
  int    ret;

  // Fetch a page container from the page cache
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
  pgid.pgno = pgno;
  pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1);
  if (pPage == NULL) {
    return -1;
  }

H
Hongze Cheng 已提交
248 249 250 251 252
  // Initialize the page if need
  if (!TDB_PAGE_INITIALIZED(pPage)) {
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg);
    if (ret < 0) {
      return -1;
H
more  
Hongze Cheng 已提交
253
    }
H
more  
Hongze Cheng 已提交
254 255
  }

H
Hongze Cheng 已提交
256 257 258
  ASSERT(TDB_PAGE_INITIALIZED(pPage));
  ASSERT(pPage->pPager == pPager);

H
more  
Hongze Cheng 已提交
259 260
  *ppPage = pPage;
  return 0;
H
refact  
Hongze Cheng 已提交
261 262
}

H
Hongze Cheng 已提交
263
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) {
H
more  
Hongze Cheng 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
  int    ret;
  SPage *pPage;
  SPgid  pgid;

  // Allocate a page number
  ret = tdbPagerAllocPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  ASSERT(*ppgno != 0);

  // Fetch a page container from the page cache
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
  pgid.pgno = *ppgno;
  pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1);
  if (pPage == NULL) {
    return -1;
  }

H
Hongze Cheng 已提交
284
  ASSERT(!TDB_PAGE_INITIALIZED(pPage));
H
more  
Hongze Cheng 已提交
285

H
Hongze Cheng 已提交
286 287 288 289 290
  // Initialize the page if need
  ret = tdbPagerInitPage(pPager, pPage, initPage, arg);
  if (ret < 0) {
    return -1;
  }
H
more  
Hongze Cheng 已提交
291

H
Hongze Cheng 已提交
292 293
  ASSERT(TDB_PAGE_INITIALIZED(pPage));
  ASSERT(pPage->pPager == pPager);
H
more  
Hongze Cheng 已提交
294

H
more  
Hongze Cheng 已提交
295 296 297 298
  *ppPage = pPage;
  return 0;
}

H
Hongze Cheng 已提交
299
void tdbPagerReturnPage(SPager *pPager, SPage *pPage) { tdbPCacheRelease(pPager->pCache, pPage); }
H
Hongze Cheng 已提交
300

H
more  
Hongze Cheng 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
  // TODO: Allocate a page from the free list
  return 0;
}

static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
  *ppgno = ++pPager->dbFileSize;
  return 0;
}

static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
  int ret;

  *ppgno = 0;

  // Try to allocate from the free list of the pager
  ret = tdbPagerAllocFreePage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  if (*ppgno != 0) return 0;

  // Allocate the page by extending the pager
  ret = tdbPagerAllocNewPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  ASSERT(*ppgno != 0);

H
Hongze Cheng 已提交
332 333 334 335 336
  return 0;
}

static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg) {
  int ret;
H
Hongze Cheng 已提交
337
  int lcode;
H
Hongze Cheng 已提交
338
  int nLoops;
H
Hongze Cheng 已提交
339

H
Hongze Cheng 已提交
340 341
  lcode = TDB_TRY_LOCK_PAGE(pPage);
  if (lcode == P_LOCK_SUCC) {
H
Hongze Cheng 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355
    if (TDB_PAGE_INITIALIZED(pPage)) {
      TDB_UNLOCK_PAGE(pPage);
      return 0;
    }

    ret = (*initPage)(pPage, arg);
    if (ret < 0) {
      TDB_UNLOCK_PAGE(pPage);
      return -1;
    }

    pPage->pPager = pPager;

    TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
356
  } else if (lcode == P_LOCK_BUSY) {
H
Hongze Cheng 已提交
357 358 359 360 361 362 363 364 365
    nLoops = 0;
    for (;;) {
      if (TDB_PAGE_INITIALIZED(pPage)) break;
      nLoops++;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
    }
H
Hongze Cheng 已提交
366 367
  } else {
    return -1;
H
Hongze Cheng 已提交
368 369
  }

H
Hongze Cheng 已提交
370 371 372 373 374 375 376 377 378 379
  return 0;
}

// ---------------------------- Journal manipulation
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
  int   ret;
  SPgno pgno;

  pgno = TDB_PAGE_PGNO(pPage);

H
Hongze Cheng 已提交
380
  ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
H
Hongze Cheng 已提交
381 382 383 384
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
385
  ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
386 387 388 389 390 391 392 393 394 395 396 397
  if (ret < 0) {
    return -1;
  }

  return 0;
}

static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

  offset = pPage->pageSize * TDB_PAGE_PGNO(pPage);
H
Hongze Cheng 已提交
398
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
H
Hongze Cheng 已提交
399 400 401 402
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
403
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
404 405 406 407 408
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
409 410
  return 0;
}