tdbPager.c 9.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16 17
#include "tdbInt.h"

wafwerar's avatar
wafwerar 已提交
18 19
#pragma pack(push,1)
typedef struct {
H
Hongze Cheng 已提交
20 21 22 23 24
  u8    hdrString[16];
  u16   pageSize;
  SPgno freePage;
  u32   nFreePages;
  u8    reserved[102];
H
Hongze Cheng 已提交
25
} SFileHdr;
wafwerar's avatar
wafwerar 已提交
26
#pragma pack(pop)
H
Hongze Cheng 已提交
27

H
Hongze Cheng 已提交
28 29
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");

H
Hongze Cheng 已提交
30 31
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)

H
Hongze Cheng 已提交
32
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage);
H
Hongze Cheng 已提交
33 34
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
H
Hongze Cheng 已提交
35

H
refact  
Hongze Cheng 已提交
36
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
H
more  
Hongze Cheng 已提交
37
  uint8_t *pPtr;
H
Hongze Cheng 已提交
38
  SPager  *pPager;
H
more  
Hongze Cheng 已提交
39 40
  int      fsize;
  int      zsize;
H
Hongze Cheng 已提交
41
  int      ret;
H
more  
Hongze Cheng 已提交
42

H
refact  
Hongze Cheng 已提交
43
  *ppPager = NULL;
H
more  
Hongze Cheng 已提交
44 45

  fsize = strlen(fileName);
H
refact  
Hongze Cheng 已提交
46
  zsize = sizeof(*pPager)  /* SPager */
H
more  
Hongze Cheng 已提交
47 48
          + fsize + 1      /* dbFileName */
          + fsize + 8 + 1; /* jFileName */
H
Hongze Cheng 已提交
49
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
H
more  
Hongze Cheng 已提交
50 51 52 53
  if (pPtr == NULL) {
    return -1;
  }

H
refact  
Hongze Cheng 已提交
54 55 56 57 58 59
  pPager = (SPager *)pPtr;
  pPtr += sizeof(*pPager);
  // pPager->dbFileName
  pPager->dbFileName = (char *)pPtr;
  memcpy(pPager->dbFileName, fileName, fsize);
  pPager->dbFileName[fsize] = '\0';
H
more  
Hongze Cheng 已提交
60
  pPtr += fsize + 1;
H
refact  
Hongze Cheng 已提交
61 62 63 64 65 66 67 68
  // pPager->jFileName
  pPager->jFileName = (char *)pPtr;
  memcpy(pPager->jFileName, fileName, fsize);
  memcpy(pPager->jFileName + fsize, "-journal", 8);
  pPager->jFileName[fsize + 8] = '\0';
  // pPager->pCache
  pPager->pCache = pCache;

H
Hongze Cheng 已提交
69
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
70
  if (pPager->fd < 0) {
H
more  
Hongze Cheng 已提交
71 72 73
    return -1;
  }

H
refact  
Hongze Cheng 已提交
74
  ret = tdbGnrtFileID(pPager->dbFileName, pPager->fid, false);
H
Hongze Cheng 已提交
75 76 77 78
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
79
  // pPager->jfd = -1;
H
Hongze Cheng 已提交
80
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
H
Hongze Cheng 已提交
81 82
  // pPager->dbOrigSize
  ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
H
more  
Hongze Cheng 已提交
83

H
refact  
Hongze Cheng 已提交
84
  *ppPager = pPager;
H
more  
Hongze Cheng 已提交
85 86 87
  return 0;
}

H
refact  
Hongze Cheng 已提交
88
int tdbPagerClose(SPager *pPager) {
H
more  
Hongze Cheng 已提交
89 90 91 92
  // TODO
  return 0;
}

H
refact  
Hongze Cheng 已提交
93
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
H
Hongze Cheng 已提交
94 95 96 97
  SPgno  pgno;
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
98 99 100
  if (pPager->dbOrigSize > 0) {
    pgno = 1;
  } else {
H
Hongze Cheng 已提交
101 102 103
    pgno = 0;
  }

H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111 112 113
  {
      // TODO: try to search the main DB to get the page number
      // pgno = 0;
  }

      // if (pgno == 0 && toCreate) {
      //   ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
      //   if (ret < 0) {
      //     return -1;
      //   }
H
Hongze Cheng 已提交
114

H
Hongze Cheng 已提交
115
      //   // TODO: Need to zero the page
H
refact  
Hongze Cheng 已提交
116

H
Hongze Cheng 已提交
117 118 119 120 121
      //   ret = tdbPagerWrite(pPager, pPage);
      //   if (ret < 0) {
      //     return -1;
      //   }
      // }
H
Hongze Cheng 已提交
122

H
Hongze Cheng 已提交
123
      *ppgno = pgno;
H
Hongze Cheng 已提交
124 125 126
  return 0;
}

H
refact  
Hongze Cheng 已提交
127
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
H
Hongze Cheng 已提交
128 129
  int     ret;
  SPage **ppPage;
H
more  
Hongze Cheng 已提交
130

H
Hongze Cheng 已提交
131 132
  ASSERT(pPager->inTran);
#if 0
H
refact  
Hongze Cheng 已提交
133 134
  if (pPager->inTran == 0) {
    ret = tdbPagerBegin(pPager);
H
more  
Hongze Cheng 已提交
135 136 137 138
    if (ret < 0) {
      return -1;
    }
  }
H
Hongze Cheng 已提交
139
#endif
H
more  
Hongze Cheng 已提交
140

H
Hongze Cheng 已提交
141
  if (pPage->isDirty) return 0;
H
more  
Hongze Cheng 已提交
142

H
Hongze Cheng 已提交
143 144 145
  // ref page one more time so the page will not be release
  TDB_REF_PAGE(pPage);

H
Hongze Cheng 已提交
146 147 148
  // Set page as dirty
  pPage->isDirty = 1;

H
Hongze Cheng 已提交
149
  // Add page to dirty list(TODO: NOT use O(n^2) algorithm)
H
Hongze Cheng 已提交
150 151 152 153 154 155
  for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
       ppPage = &((*ppPage)->pDirtyNext)) {
  }
  ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
  pPage->pDirtyNext = *ppPage;
  *ppPage = pPage;
H
Hongze Cheng 已提交
156

H
Hongze Cheng 已提交
157
  // Write page to journal if neccessary
H
Hongze Cheng 已提交
158 159 160 161 162
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
    ret = tdbPagerWritePageToJournal(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
H
Hongze Cheng 已提交
163
    }
H
more  
Hongze Cheng 已提交
164
  }
H
Hongze Cheng 已提交
165

H
Hongze Cheng 已提交
166 167 168
  return 0;
}

H
Hongze Cheng 已提交
169
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
H
refact  
Hongze Cheng 已提交
170
  if (pPager->inTran) {
H
more  
Hongze Cheng 已提交
171 172
    return 0;
  }
H
Hongze Cheng 已提交
173 174

  // Open the journal
H
Hongze Cheng 已提交
175
  pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
H
refact  
Hongze Cheng 已提交
176
  if (pPager->jfd < 0) {
H
Hongze Cheng 已提交
177 178 179 180 181
    return -1;
  }

  // TODO: write the size of the file

H
refact  
Hongze Cheng 已提交
182
  pPager->inTran = 1;
H
Hongze Cheng 已提交
183

H
more  
Hongze Cheng 已提交
184 185 186
  return 0;
}

H
Hongze Cheng 已提交
187
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
H
Hongze Cheng 已提交
188 189 190
  SPage *pPage;
  int    ret;

H
Hongze Cheng 已提交
191 192 193 194 195 196
  // sync the journal file
  ret = tdbOsFSync(pPager->jfd);
  if (ret < 0) {
    // TODO
    ASSERT(0);
    return 0;
H
Hongze Cheng 已提交
197 198
  }

H
Hongze Cheng 已提交
199 200
  // loop to write the dirty pages to file
  for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) {
H
Hongze Cheng 已提交
201
    // TODO: update the page footer
H
Hongze Cheng 已提交
202 203 204 205 206 207 208
    ret = tdbPagerWritePageToDB(pPager, pPage);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }
  }

H
Hongze Cheng 已提交
209
  // release the page
H
Hongze Cheng 已提交
210
  for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
H
Hongze Cheng 已提交
211 212 213 214 215
    pPager->pDirty = pPage->pDirtyNext;
    pPage->pDirtyNext = NULL;

    pPage->isDirty = 0;

H
Hongze Cheng 已提交
216
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
H
Hongze Cheng 已提交
217
  }
H
Hongze Cheng 已提交
218 219

  // sync the db file
H
Hongze Cheng 已提交
220
  tdbOsFSync(pPager->fd);
H
Hongze Cheng 已提交
221

H
Hongze Cheng 已提交
222
  // remote the journal file
H
Hongze Cheng 已提交
223
  tdbOsClose(pPager->jfd);
H
Hongze Cheng 已提交
224
  tdbOsRemove(pPager->jFileName);
H
Hongze Cheng 已提交
225
  pPager->dbOrigSize = pPager->dbFileSize;
H
Hongze Cheng 已提交
226
  pPager->inTran = 0;
H
Hongze Cheng 已提交
227

H
more  
Hongze Cheng 已提交
228 229 230
  return 0;
}

H
Hongze Cheng 已提交
231 232
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
                      TXN *pTxn) {
H
more  
Hongze Cheng 已提交
233 234 235 236 237 238 239
  SPage *pPage;
  SPgid  pgid;
  int    ret;

  // Fetch a page container from the page cache
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
  pgid.pgno = pgno;
H
Hongze Cheng 已提交
240
  pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
H
more  
Hongze Cheng 已提交
241 242 243 244
  if (pPage == NULL) {
    return -1;
  }

H
Hongze Cheng 已提交
245 246
  // Initialize the page if need
  if (!TDB_PAGE_INITIALIZED(pPage)) {
H
Hongze Cheng 已提交
247
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 1);
H
Hongze Cheng 已提交
248 249
    if (ret < 0) {
      return -1;
H
more  
Hongze Cheng 已提交
250
    }
H
more  
Hongze Cheng 已提交
251 252
  }

H
Hongze Cheng 已提交
253 254 255
  ASSERT(TDB_PAGE_INITIALIZED(pPage));
  ASSERT(pPage->pPager == pPager);

H
more  
Hongze Cheng 已提交
256 257
  *ppPage = pPage;
  return 0;
H
refact  
Hongze Cheng 已提交
258 259
}

H
Hongze Cheng 已提交
260 261
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
                    TXN *pTxn) {
H
more  
Hongze Cheng 已提交
262 263 264 265 266 267 268
  int    ret;
  SPage *pPage;
  SPgid  pgid;

  // Allocate a page number
  ret = tdbPagerAllocPage(pPager, ppgno);
  if (ret < 0) {
H
Hongze Cheng 已提交
269
    ASSERT(0);
H
more  
Hongze Cheng 已提交
270 271 272 273 274 275 276 277
    return -1;
  }

  ASSERT(*ppgno != 0);

  // Fetch a page container from the page cache
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
  pgid.pgno = *ppgno;
H
Hongze Cheng 已提交
278
  pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
H
more  
Hongze Cheng 已提交
279
  if (pPage == NULL) {
H
Hongze Cheng 已提交
280
    ASSERT(0);
H
more  
Hongze Cheng 已提交
281 282 283
    return -1;
  }

H
Hongze Cheng 已提交
284
  ASSERT(!TDB_PAGE_INITIALIZED(pPage));
H
more  
Hongze Cheng 已提交
285

H
Hongze Cheng 已提交
286
  // Initialize the page if need
H
Hongze Cheng 已提交
287
  ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0);
H
Hongze Cheng 已提交
288
  if (ret < 0) {
H
Hongze Cheng 已提交
289
    ASSERT(0);
H
Hongze Cheng 已提交
290 291
    return -1;
  }
H
more  
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293 294
  ASSERT(TDB_PAGE_INITIALIZED(pPage));
  ASSERT(pPage->pPager == pPager);
H
more  
Hongze Cheng 已提交
295

H
more  
Hongze Cheng 已提交
296 297 298 299
  *ppPage = pPage;
  return 0;
}

H
Hongze Cheng 已提交
300
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); }
H
Hongze Cheng 已提交
301

H
more  
Hongze Cheng 已提交
302 303 304 305 306 307 308 309 310 311
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
  // TODO: Allocate a page from the free list
  return 0;
}

static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
  *ppgno = ++pPager->dbFileSize;
  return 0;
}

H
Hongze Cheng 已提交
312
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
H
more  
Hongze Cheng 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
  int ret;

  *ppgno = 0;

  // Try to allocate from the free list of the pager
  ret = tdbPagerAllocFreePage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  if (*ppgno != 0) return 0;

  // Allocate the page by extending the pager
  ret = tdbPagerAllocNewPage(pPager, ppgno);
  if (ret < 0) {
    return -1;
  }

  ASSERT(*ppgno != 0);

H
Hongze Cheng 已提交
333 334 335
  return 0;
}

H
Hongze Cheng 已提交
336
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage) {
H
Hongze Cheng 已提交
337
  int ret;
H
Hongze Cheng 已提交
338
  int lcode;
H
Hongze Cheng 已提交
339
  int nLoops;
H
Hongze Cheng 已提交
340
  i64 nRead;
H
Hongze Cheng 已提交
341

H
Hongze Cheng 已提交
342 343
  lcode = TDB_TRY_LOCK_PAGE(pPage);
  if (lcode == P_LOCK_SUCC) {
H
Hongze Cheng 已提交
344 345 346 347 348
    if (TDB_PAGE_INITIALIZED(pPage)) {
      TDB_UNLOCK_PAGE(pPage);
      return 0;
    }

H
Hongze Cheng 已提交
349 350 351 352 353 354 355 356 357 358 359 360 361
    if (loadPage) {
      nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * TDB_PAGE_PGNO(pPage));
      if (nRead < 0) {
        // TODO
        ASSERT(0);
        return -1;
      } else if (nRead < pPage->pageSize) {
        // TODO
        ASSERT(0);
        return -1;
      }
    }

H
Hongze Cheng 已提交
362 363 364 365 366 367 368 369 370
    ret = (*initPage)(pPage, arg);
    if (ret < 0) {
      TDB_UNLOCK_PAGE(pPage);
      return -1;
    }

    pPage->pPager = pPager;

    TDB_UNLOCK_PAGE(pPage);
H
Hongze Cheng 已提交
371
  } else if (lcode == P_LOCK_BUSY) {
H
Hongze Cheng 已提交
372 373 374 375 376 377 378 379 380
    nLoops = 0;
    for (;;) {
      if (TDB_PAGE_INITIALIZED(pPage)) break;
      nLoops++;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
    }
H
Hongze Cheng 已提交
381 382
  } else {
    return -1;
H
Hongze Cheng 已提交
383 384
  }

H
Hongze Cheng 已提交
385 386 387 388 389 390 391 392 393 394
  return 0;
}

// ---------------------------- Journal manipulation
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
  int   ret;
  SPgno pgno;

  pgno = TDB_PAGE_PGNO(pPage);

H
Hongze Cheng 已提交
395
  ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
H
Hongze Cheng 已提交
396 397 398 399
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
400
  ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
401 402 403 404 405 406 407 408 409 410 411 412
  if (ret < 0) {
    return -1;
  }

  return 0;
}

static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
  i64 offset;
  int ret;

  offset = pPage->pageSize * TDB_PAGE_PGNO(pPage);
H
Hongze Cheng 已提交
413
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
H
Hongze Cheng 已提交
414 415 416 417
    ASSERT(0);
    return -1;
  }

H
Hongze Cheng 已提交
418
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
H
Hongze Cheng 已提交
419 420 421 422 423
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
424 425
  return 0;
}