tdbPCache.c 10.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15 16
#include "tdbInt.h"

H
Hongze Cheng 已提交
17 18
// #include <sys/types.h>
// #include <unistd.h>
H
Hongze Cheng 已提交
19

H
more  
Hongze Cheng 已提交
20
struct SPCache {
H
Hongze Cheng 已提交
21 22 23
  int         szPage;
  int         nPages;
  SPage     **aPage;
H
Hongze Cheng 已提交
24 25 26 27 28 29 30 31
  tdb_mutex_t mutex;
  int         nFree;
  SPage      *pFree;
  int         nPage;
  int         nHash;
  SPage     **pgHash;
  int         nRecyclable;
  SPage       lru;
H
Hongze Cheng 已提交
32 33
};

wafwerar's avatar
wafwerar 已提交
34 35 36
static inline uint32_t tdbPCachePageHash(const SPgid *pPgid) {
  uint32_t *t = (uint32_t *)((pPgid)->fileid);
  return (uint32_t)(t[0] + t[1] + t[2] + t[3] + t[4] + t[5] + (pPgid)->pgno);
wafwerar's avatar
wafwerar 已提交
37
}
H
Hongze Cheng 已提交
38

H
refact  
Hongze Cheng 已提交
39
static int    tdbPCacheOpenImpl(SPCache *pCache);
H
Hongze Cheng 已提交
40
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
H
Hongze Cheng 已提交
41 42 43 44
static void   tdbPCachePinPage(SPCache *pCache, SPage *pPage);
static void   tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage);
static void   tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage);
static void   tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage);
H
Hongze Cheng 已提交
45 46 47 48 49 50
static int    tdbPCacheCloseImpl(SPCache *pCache);

static void tdbPCacheInitLock(SPCache *pCache) { tdbMutexInit(&(pCache->mutex), NULL); }
static void tdbPCacheDestroyLock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { tdbMutexLock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexUnlock(&(pCache->mutex)); }
H
Hongze Cheng 已提交
51

H
Hongze Cheng 已提交
52
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
H
more  
Hongze Cheng 已提交
53
  SPCache *pCache;
H
Hongze Cheng 已提交
54 55
  void    *pPtr;
  SPage   *pPgHdr;
H
Hongze Cheng 已提交
56

H
Hongze Cheng 已提交
57
  pCache = (SPCache *)tdbOsCalloc(1, sizeof(*pCache) + sizeof(SPage *) * cacheSize);
H
more  
Hongze Cheng 已提交
58
  if (pCache == NULL) {
H
Hongze Cheng 已提交
59 60 61
    return -1;
  }

H
Hongze Cheng 已提交
62 63 64
  pCache->szPage = pageSize;
  pCache->nPages = cacheSize;
  pCache->aPage = (SPage **)&pCache[1];
H
Hongze Cheng 已提交
65

H
more  
Hongze Cheng 已提交
66
  if (tdbPCacheOpenImpl(pCache) < 0) {
H
Hongze Cheng 已提交
67
    tdbOsFree(pCache);
H
more  
Hongze Cheng 已提交
68
    return -1;
H
Hongze Cheng 已提交
69 70
  }

H
more  
Hongze Cheng 已提交
71
  *ppCache = pCache;
H
Hongze Cheng 已提交
72 73 74
  return 0;
}

H
Hongze Cheng 已提交
75
int tdbPCacheClose(SPCache *pCache) {
H
Hongze Cheng 已提交
76 77 78 79
  if (pCache) {
    tdbPCacheCloseImpl(pCache);
    tdbOsFree(pCache);
  }
H
Hongze Cheng 已提交
80
  return 0;
H
Hongze Cheng 已提交
81 82
}

H
Hongze Cheng 已提交
83
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
H
refact  
Hongze Cheng 已提交
84
  SPage *pPage;
H
Hongze Cheng 已提交
85
  i32    nRef;
H
more  
Hongze Cheng 已提交
86

H
Hongze Cheng 已提交
87
  tdbPCacheLock(pCache);
H
Hongze Cheng 已提交
88

H
Hongze Cheng 已提交
89
  pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn);
H
Hongze Cheng 已提交
90
  if (pPage) {
H
Hongze Cheng 已提交
91
    nRef = tdbRefPage(pPage);
H
Hongze Cheng 已提交
92 93
  }

H
Hongze Cheng 已提交
94 95
  ASSERT(pPage);

H
Hongze Cheng 已提交
96
  tdbPCacheUnlock(pCache);
H
more  
Hongze Cheng 已提交
97

H
Hongze Cheng 已提交
98 99 100
  // printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage, nRef);

M
Minglei Jin 已提交
101
  tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
H
more  
Hongze Cheng 已提交
102
  return pPage;
H
Hongze Cheng 已提交
103 104
}

H
Hongze Cheng 已提交
105
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
H
Hongze Cheng 已提交
106 107
  i32 nRef;

H
Hongze Cheng 已提交
108 109
  ASSERT(pTxn);

H
Hongze Cheng 已提交
110 111
  // nRef = tdbUnrefPage(pPage);
  // ASSERT(nRef >= 0);
H
Hongze Cheng 已提交
112

H
Hongze Cheng 已提交
113 114
  tdbPCacheLock(pCache);
  nRef = tdbUnrefPage(pPage);
115
  tdbDebug("pcache/release page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
H
Hongze Cheng 已提交
116
  if (nRef == 0) {
H
Hongze Cheng 已提交
117 118
    // test the nRef again to make sure
    // it is safe th handle the page
H
Hongze Cheng 已提交
119 120 121 122 123 124 125 126
    // nRef = tdbGetPageRef(pPage);
    // if (nRef == 0) {
    if (pPage->isLocal) {
      tdbPCacheUnpinPage(pCache, pPage);
    } else {
      if (TDB_TXN_IS_WRITE(pTxn)) {
        // remove from hash
        tdbPCacheRemovePageFromHash(pCache, pPage);
H
Hongze Cheng 已提交
127 128
      }

H
Hongze Cheng 已提交
129 130 131
      tdbPageDestroy(pPage, pTxn->xFree, pTxn->xArg);
    }
    // }
H
more  
Hongze Cheng 已提交
132
  }
H
Hongze Cheng 已提交
133 134 135
  tdbPCacheUnlock(pCache);
  // printf("thread %" PRId64 " relas page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
  //        TDB_PAGE_PGNO(pPage), pPage, nRef);
H
Hongze Cheng 已提交
136 137
}

H
Hongze Cheng 已提交
138
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->szPage; }
H
more  
Hongze Cheng 已提交
139

H
Hongze Cheng 已提交
140
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
H
Hongze Cheng 已提交
141 142 143 144 145
  int    ret = 0;
  SPage *pPage = NULL;
  SPage *pPageH = NULL;

  ASSERT(pTxn);
H
more  
Hongze Cheng 已提交
146 147

  // 1. Search the hash table
wafwerar's avatar
wafwerar 已提交
148
  pPage = pCache->pgHash[tdbPCachePageHash(pPgid) % pCache->nHash];
H
more  
Hongze Cheng 已提交
149
  while (pPage) {
150
    if (pPage->pgid.pgno == pPgid->pgno && memcmp(pPage->pgid.fileid, pPgid->fileid, TDB_FILE_ID_LEN) == 0) break;
H
more  
Hongze Cheng 已提交
151 152 153
    pPage = pPage->pHashNext;
  }

H
refact  
Hongze Cheng 已提交
154
  if (pPage) {
H
Hongze Cheng 已提交
155 156 157 158
    if (pPage->isLocal || TDB_TXN_IS_WRITE(pTxn)) {
      tdbPCachePinPage(pCache, pPage);
      return pPage;
    }
H
more  
Hongze Cheng 已提交
159 160
  }

H
Hongze Cheng 已提交
161 162 163 164 165
  // 1. pPage == NULL
  // 2. pPage && pPage->isLocal == 0 && !TDB_TXN_IS_WRITE(pTxn)
  pPageH = pPage;
  pPage = NULL;

H
more  
Hongze Cheng 已提交
166 167 168 169 170 171 172 173 174 175 176
  // 2. Try to allocate a new page from the free list
  if (pCache->pFree) {
    pPage = pCache->pFree;
    pCache->pFree = pPage->pFreeNext;
    pCache->nFree--;
    pPage->pLruNext = NULL;
  }

  // 3. Try to Recycle a page
  if (!pPage && !pCache->lru.pLruPrev->isAnchor) {
    pPage = pCache->lru.pLruPrev;
H
Hongze Cheng 已提交
177 178
    tdbPCacheRemovePageFromHash(pCache, pPage);
    tdbPCachePinPage(pCache, pPage);
H
more  
Hongze Cheng 已提交
179 180
  }

H
Hongze Cheng 已提交
181
  // 4. Try a create new page
H
Hongze Cheng 已提交
182
  if (!pPage) {
H
Hongze Cheng 已提交
183
    ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
H
Hongze Cheng 已提交
184 185 186 187 188 189 190 191 192
    if (ret < 0) {
      // TODO
      ASSERT(0);
      return NULL;
    }

    // init the page fields
    pPage->isAnchor = 0;
    pPage->isLocal = 0;
H
Hongze Cheng 已提交
193 194
    pPage->nRef = 0;
    pPage->id = -1;
H
Hongze Cheng 已提交
195
  }
H
more  
Hongze Cheng 已提交
196 197 198 199 200

  // 5. Page here are just created from a free list
  // or by recycling or allocated streesly,
  // need to initialize it
  if (pPage) {
H
Hongze Cheng 已提交
201 202 203
    if (pPageH) {
      // copy the page content
      memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
204 205 206 207 208 209 210 211 212

      for (int nLoops = 0;;) {
        if (pPageH->pPager) break;
        if (++nLoops > 1000) {
          sched_yield();
          nLoops = 0;
        }
      }

H
Hongze Cheng 已提交
213 214 215 216
      pPage->pLruNext = NULL;
      pPage->pPager = pPageH->pPager;

      memcpy(pPage->pData, pPageH->pData, pPage->pageSize);
217 218
      tdbDebug("pcache/pPageH: %p %d %p %p %d", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage,
               TDB_PAGE_PGNO(pPageH));
H
Hongze Cheng 已提交
219
      tdbPageInit(pPage, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize);
H
Hongze Cheng 已提交
220 221 222 223
      pPage->kLen = pPageH->kLen;
      pPage->vLen = pPageH->vLen;
      pPage->maxLocal = pPageH->maxLocal;
      pPage->minLocal = pPageH->minLocal;
H
Hongze Cheng 已提交
224 225 226 227 228 229 230 231 232
    } else {
      memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
      pPage->pLruNext = NULL;
      pPage->pPager = NULL;

      if (pPage->isLocal || TDB_TXN_IS_WRITE(pTxn)) {
        tdbPCacheAddPageToHash(pCache, pPage);
      }
    }
H
more  
Hongze Cheng 已提交
233 234
  }

H
more  
Hongze Cheng 已提交
235
  return pPage;
H
more  
Hongze Cheng 已提交
236 237
}

H
Hongze Cheng 已提交
238
static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
H
Hongze Cheng 已提交
239 240 241
  if (pPage->pLruNext != NULL) {
    ASSERT(tdbGetPageRef(pPage) == 0);

H
more  
Hongze Cheng 已提交
242 243 244 245 246
    pPage->pLruPrev->pLruNext = pPage->pLruNext;
    pPage->pLruNext->pLruPrev = pPage->pLruPrev;
    pPage->pLruNext = NULL;

    pCache->nRecyclable--;
H
Hongze Cheng 已提交
247

H
Hongze Cheng 已提交
248
    // printf("pin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
249
    tdbDebug("pcache/pin page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
H
more  
Hongze Cheng 已提交
250
  }
H
more  
Hongze Cheng 已提交
251 252
}

H
Hongze Cheng 已提交
253 254
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
  i32 nRef;
H
Hongze Cheng 已提交
255

H
Hongze Cheng 已提交
256
  ASSERT(pPage->isLocal);
H
Hongze Cheng 已提交
257
  ASSERT(!pPage->isDirty);
H
Hongze Cheng 已提交
258
  ASSERT(tdbGetPageRef(pPage) == 0);
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260
  ASSERT(pPage->pLruNext == NULL);
H
more  
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262 263 264 265
  pPage->pLruPrev = &(pCache->lru);
  pPage->pLruNext = pCache->lru.pLruNext;
  pCache->lru.pLruNext->pLruPrev = pPage;
  pCache->lru.pLruNext = pPage;
H
more  
Hongze Cheng 已提交
266 267

  pCache->nRecyclable++;
H
Hongze Cheng 已提交
268

H
Hongze Cheng 已提交
269
  // printf("unpin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
270
  tdbDebug("pcache/unpin page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
H
more  
Hongze Cheng 已提交
271 272
}

H
Hongze Cheng 已提交
273
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
274 275 276
  uint32_t h = tdbPCachePageHash(&(pPage->pgid)) % pCache->nHash;

  SPage **ppPage = &(pCache->pgHash[h]);
277 278 279
  for (; (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
    ;

H
Hongze Cheng 已提交
280
  if (*ppPage) {
281 282
    *ppPage = pPage->pHashNext;
    pCache->nPage--;
H
Hongze Cheng 已提交
283 284
    // printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
  }
H
Hongze Cheng 已提交
285

286
  tdbDebug("pcache/remove page %p/%d/%d from hash %" PRIu32, pPage, TDB_PAGE_PGNO(pPage), pPage->id, h);
H
more  
Hongze Cheng 已提交
287 288
}

H
Hongze Cheng 已提交
289
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
290
  uint32_t h = tdbPCachePageHash(&(pPage->pgid)) % pCache->nHash;
H
more  
Hongze Cheng 已提交
291 292 293 294 295

  pPage->pHashNext = pCache->pgHash[h];
  pCache->pgHash[h] = pPage;

  pCache->nPage++;
H
Hongze Cheng 已提交
296

H
Hongze Cheng 已提交
297
  // printf("add page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
298
  tdbDebug("pcache/add page %p/%d/%d to hash %" PRIu32, pPage, TDB_PAGE_PGNO(pPage), pPage->id, h);
H
more  
Hongze Cheng 已提交
299 300 301
}

static int tdbPCacheOpenImpl(SPCache *pCache) {
H
refact  
Hongze Cheng 已提交
302
  SPage *pPage;
H
Hongze Cheng 已提交
303
  u8    *pPtr;
H
refact  
Hongze Cheng 已提交
304
  int    tsize;
H
Hongze Cheng 已提交
305
  int    ret;
H
more  
Hongze Cheng 已提交
306 307 308 309 310 311

  tdbPCacheInitLock(pCache);

  // Open the free list
  pCache->nFree = 0;
  pCache->pFree = NULL;
H
Hongze Cheng 已提交
312 313
  for (int i = 0; i < pCache->nPages; i++) {
    ret = tdbPageCreate(pCache->szPage, &pPage, tdbDefaultMalloc, NULL);
H
Hongze Cheng 已提交
314 315
    if (ret < 0) {
      // TODO: handle error
H
more  
Hongze Cheng 已提交
316 317 318 319 320
      return -1;
    }

    // pPage->pgid = 0;
    pPage->isAnchor = 0;
H
Hongze Cheng 已提交
321
    pPage->isLocal = 1;
H
Hongze Cheng 已提交
322
    pPage->nRef = 0;
H
more  
Hongze Cheng 已提交
323 324 325
    pPage->pHashNext = NULL;
    pPage->pLruNext = NULL;
    pPage->pLruPrev = NULL;
H
more  
Hongze Cheng 已提交
326
    pPage->pDirtyNext = NULL;
H
more  
Hongze Cheng 已提交
327

H
Hongze Cheng 已提交
328
    // add page to free list
H
more  
Hongze Cheng 已提交
329 330 331
    pPage->pFreeNext = pCache->pFree;
    pCache->pFree = pPage;
    pCache->nFree++;
H
Hongze Cheng 已提交
332 333

    // add to local list
H
Hongze Cheng 已提交
334 335
    pPage->id = i;
    pCache->aPage[i] = pPage;
H
more  
Hongze Cheng 已提交
336 337 338 339
  }

  // Open the hash table
  pCache->nPage = 0;
H
Hongze Cheng 已提交
340
  pCache->nHash = pCache->nPages < 8 ? 8 : pCache->nPages;
H
Hongze Cheng 已提交
341
  pCache->pgHash = (SPage **)tdbOsCalloc(pCache->nHash, sizeof(SPage *));
H
more  
Hongze Cheng 已提交
342 343 344 345 346 347 348 349 350 351 352 353
  if (pCache->pgHash == NULL) {
    // TODO
    return -1;
  }

  // Open LRU list
  pCache->nRecyclable = 0;
  pCache->lru.isAnchor = 1;
  pCache->lru.pLruNext = &(pCache->lru);
  pCache->lru.pLruPrev = &(pCache->lru);

  return 0;
H
Hongze Cheng 已提交
354 355
}

H
Hongze Cheng 已提交
356
static int tdbPCacheCloseImpl(SPCache *pCache) {
H
Hongze Cheng 已提交
357 358
  for (i32 iPage = 0; iPage < pCache->nPages; iPage++) {
    if (pCache->aPage[iPage]) {
H
Hongze Cheng 已提交
359
      tdbPageDestroy(pCache->aPage[iPage], tdbDefaultFree, NULL);
H
Hongze Cheng 已提交
360 361
      pCache->aPage[iPage] = NULL;
    }
H
Hongze Cheng 已提交
362 363
  }

H
Hongze Cheng 已提交
364
  tdbOsFree(pCache->pgHash);
H
Hongze Cheng 已提交
365
  tdbPCacheDestroyLock(pCache);
wmmhello's avatar
wmmhello 已提交
366
  return 0;
H
Hongze Cheng 已提交
367
}