tdbPage.c 19.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tdbInt.h"

H
Hongze Cheng 已提交
18 19 20
extern SPageMethods pageMethods;
extern SPageMethods pageLargeMethods;

H
Hongze Cheng 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33
#define TDB_PAGE_HDR_SIZE(pPage)                        ((pPage)->pPageMethods->szPageHdr)
#define TDB_PAGE_FREE_CELL_SIZE(pPage)                  ((pPage)->pPageMethods->szFreeCell)
#define TDB_PAGE_NCELLS(pPage)                          (*(pPage)->pPageMethods->getCellNum)(pPage)
#define TDB_PAGE_CCELLS(pPage)                          (*(pPage)->pPageMethods->getCellBody)(pPage)
#define TDB_PAGE_FCELL(pPage)                           (*(pPage)->pPageMethods->getCellFree)(pPage)
#define TDB_PAGE_NFREE(pPage)                           (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_CELL_OFFSET_AT(pPage, idx)             (*(pPage)->pPageMethods->getCellOffset)(pPage, idx)
#define TDB_PAGE_NCELLS_SET(pPage, NCELLS)              (*(pPage)->pPageMethods->setCellNum)(pPage, NCELLS)
#define TDB_PAGE_CCELLS_SET(pPage, CCELLS)              (*(pPage)->pPageMethods->setCellBody)(pPage, CCELLS)
#define TDB_PAGE_FCELL_SET(pPage, FCELL)                (*(pPage)->pPageMethods->setCellFree)(pPage, FCELL)
#define TDB_PAGE_NFREE_SET(pPage, NFREE)                (*(pPage)->pPageMethods->setFreeBytes)(pPage, NFREE)
#define TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) (*(pPage)->pPageMethods->setCellOffset)(pPage, idx, OFFSET)
#define TDB_PAGE_CELL_AT(pPage, idx)                    ((pPage)->pData + TDB_PAGE_CELL_OFFSET_AT(pPage, idx))
H
Hongze Cheng 已提交
34 35
#define TDB_PAGE_MAX_FREE_BLOCK(pPage, szAmHdr) \
  ((pPage)->pageSize - (szAmHdr)-TDB_PAGE_HDR_SIZE(pPage) - sizeof(SPageFtr))
H
Hongze Cheng 已提交
36

H
Hongze Cheng 已提交
37 38
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell);
static int tdbPageDefragment(SPage *pPage);
H
Hongze Cheng 已提交
39
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int szCell);
H
Hongze Cheng 已提交
40

H
Hongze Cheng 已提交
41 42 43 44 45
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg) {
  SPage *pPage;
  u8    *ptr;
  int    size;

H
Hongze Cheng 已提交
46 47
  ASSERT(xMalloc);

H
Hongze Cheng 已提交
48 49 50 51 52
  ASSERT(TDB_IS_PGSIZE_VLD(pageSize));

  *ppPage = NULL;
  size = pageSize + sizeof(*pPage);

H
Hongze Cheng 已提交
53
  ptr = (u8 *)(xMalloc(arg, size));
H
Hongze Cheng 已提交
54
  if (ptr == NULL) {
H
Hongze Cheng 已提交
55 56 57 58 59 60
    return -1;
  }

  memset(ptr, 0, size);
  pPage = (SPage *)(ptr + pageSize);

H
Hongze Cheng 已提交
61
  TDB_INIT_PAGE_LOCK(pPage);
H
Hongze Cheng 已提交
62
  pPage->pageSize = pageSize;
H
Hongze Cheng 已提交
63
  pPage->pData = ptr;
H
Hongze Cheng 已提交
64
  if (pageSize < 65536) {
H
Hongze Cheng 已提交
65
    pPage->pPageMethods = &pageMethods;
H
Hongze Cheng 已提交
66
  } else {
H
Hongze Cheng 已提交
67
    pPage->pPageMethods = &pageLargeMethods;
H
Hongze Cheng 已提交
68 69 70 71 72 73 74 75 76
  }

  *ppPage = pPage;
  return 0;
}

int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
  u8 *ptr;

H
Hongze Cheng 已提交
77
  ASSERT(xFree);
H
Hongze Cheng 已提交
78

79 80 81 82
  for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) {
    tdbOsFree(pPage->apOvfl[iOvfl]);
  }

H
Hongze Cheng 已提交
83
  ptr = pPage->pData;
H
Hongze Cheng 已提交
84
  xFree(arg, ptr);
H
Hongze Cheng 已提交
85 86 87 88

  return 0;
}

89
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) {
H
Hongze Cheng 已提交
90
  pPage->pPageHdr = pPage->pData + szAmHdr;
H
Hongze Cheng 已提交
91 92 93
  TDB_PAGE_NCELLS_SET(pPage, 0);
  TDB_PAGE_CCELLS_SET(pPage, pPage->pageSize - sizeof(SPageFtr));
  TDB_PAGE_FCELL_SET(pPage, 0);
H
Hongze Cheng 已提交
94 95 96 97 98 99
  TDB_PAGE_NFREE_SET(pPage, TDB_PAGE_MAX_FREE_BLOCK(pPage, szAmHdr));
  pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
  pPage->pFreeStart = pPage->pCellIdx;
  pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
  pPage->pPageFtr = (SPageFtr *)(pPage->pData + pPage->pageSize - sizeof(SPageFtr));
  pPage->nOverflow = 0;
H
Hongze Cheng 已提交
100
  pPage->xCellSize = xCellSize;
H
Hongze Cheng 已提交
101 102

  ASSERT((u8 *)pPage->pPageFtr == pPage->pFreeEnd);
H
Hongze Cheng 已提交
103 104
}

105
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) {
H
Hongze Cheng 已提交
106
  pPage->pPageHdr = pPage->pData + szAmHdr;
H
Hongze Cheng 已提交
107 108 109 110 111
  pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
  pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage);
  pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
  pPage->pPageFtr = (SPageFtr *)(pPage->pData + pPage->pageSize - sizeof(SPageFtr));
  pPage->nOverflow = 0;
H
Hongze Cheng 已提交
112
  pPage->xCellSize = xCellSize;
H
Hongze Cheng 已提交
113 114 115 116 117

  ASSERT(pPage->pFreeEnd >= pPage->pFreeStart);
  ASSERT(pPage->pFreeEnd - pPage->pFreeStart <= TDB_PAGE_NFREE(pPage));
}

H
Hongze Cheng 已提交
118
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl) {
H
Hongze Cheng 已提交
119 120
  int    nFree;
  int    nCells;
H
Hongze Cheng 已提交
121
  int    iOvfl;
H
Hongze Cheng 已提交
122 123
  int    lidx;  // local idx
  SCell *pNewCell;
H
Hongze Cheng 已提交
124

H
Hongze Cheng 已提交
125
  ASSERT(szCell <= TDB_PAGE_MAX_FREE_BLOCK(pPage, pPage->pPageHdr - pPage->pData));
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127 128
  nFree = TDB_PAGE_NFREE(pPage);
  nCells = TDB_PAGE_NCELLS(pPage);
H
Hongze Cheng 已提交
129

130
  for (iOvfl = 0; iOvfl < pPage->nOverflow; ++iOvfl) {
H
Hongze Cheng 已提交
131 132 133 134 135 136
    if (pPage->aiOvfl[iOvfl] >= idx) {
      break;
    }
  }

  lidx = idx - iOvfl;
H
Hongze Cheng 已提交
137

H
Hongze Cheng 已提交
138 139 140 141 142 143 144 145
  if (asOvfl || nFree < szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
    // TODO: make it extensible
    // add the cell as an overflow cell
    for (int i = pPage->nOverflow; i > iOvfl; i--) {
      pPage->apOvfl[i] = pPage->apOvfl[i - 1];
      pPage->aiOvfl[i] = pPage->aiOvfl[i - 1];
    }

H
Hongze Cheng 已提交
146
    // TODO: here has memory leak
H
Hongze Cheng 已提交
147
    pNewCell = (SCell *)tdbOsMalloc(szCell);
H
Hongze Cheng 已提交
148 149
    memcpy(pNewCell, pCell, szCell);

150 151
    tdbDebug("tdbPage/new ovfl cell: %p", pNewCell);

H
Hongze Cheng 已提交
152
    pPage->apOvfl[iOvfl] = pNewCell;
H
Hongze Cheng 已提交
153 154 155 156
    pPage->aiOvfl[iOvfl] = idx;
    pPage->nOverflow++;
    iOvfl++;
  } else {
H
Hongze Cheng 已提交
157
    // page must has enough space to hold the cell locally
H
Hongze Cheng 已提交
158
    tdbPageAllocate(pPage, szCell, &pNewCell);
H
Hongze Cheng 已提交
159 160 161 162 163 164 165

    memcpy(pNewCell, pCell, szCell);

    // no overflow cell exists in this page
    u8 *src = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * lidx;
    u8 *dest = src + TDB_PAGE_OFFSET_SIZE(pPage);
    memmove(dest, src, pPage->pFreeStart - dest);
H
Hongze Cheng 已提交
166
    TDB_PAGE_CELL_OFFSET_AT_SET(pPage, lidx, pNewCell - pPage->pData);
H
Hongze Cheng 已提交
167
    TDB_PAGE_NCELLS_SET(pPage, nCells + 1);
H
Hongze Cheng 已提交
168 169 170 171 172 173

    ASSERT(pPage->pFreeStart == pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * (nCells + 1));
  }

  for (; iOvfl < pPage->nOverflow; iOvfl++) {
    pPage->aiOvfl[iOvfl]++;
H
Hongze Cheng 已提交
174 175 176 177 178
  }

  return 0;
}

179 180
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt) {
  tdbPageDropCell(pPage, idx, pTxn, pBt);
H
Hongze Cheng 已提交
181 182 183
  return tdbPageInsertCell(pPage, idx, pCell, szCell, 0);
}

184
int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt) {
H
Hongze Cheng 已提交
185 186 187 188
  int    lidx;
  SCell *pCell;
  int    szCell;
  int    nCells;
H
Hongze Cheng 已提交
189
  int    iOvfl;
H
Hongze Cheng 已提交
190 191 192

  nCells = TDB_PAGE_NCELLS(pPage);

H
Hongze Cheng 已提交
193 194 195 196 197
  ASSERT(idx >= 0 && idx < nCells + pPage->nOverflow);

  iOvfl = 0;
  for (; iOvfl < pPage->nOverflow; iOvfl++) {
    if (pPage->aiOvfl[iOvfl] == idx) {
H
Hongze Cheng 已提交
198
      // remove the over flow cell
199 200
      tdbOsFree(pPage->apOvfl[iOvfl]);
      tdbDebug("tdbPage/free ovfl cell: %p", pPage->apOvfl[iOvfl]);
H
Hongze Cheng 已提交
201 202 203 204 205 206
      for (; (++iOvfl) < pPage->nOverflow;) {
        pPage->aiOvfl[iOvfl - 1] = pPage->aiOvfl[iOvfl] - 1;
        pPage->apOvfl[iOvfl - 1] = pPage->apOvfl[iOvfl];
      }

      pPage->nOverflow--;
H
Hongze Cheng 已提交
207 208 209 210
      return 0;
    } else if (pPage->aiOvfl[iOvfl] > idx) {
      break;
    }
H
Hongze Cheng 已提交
211 212
  }

H
Hongze Cheng 已提交
213
  lidx = idx - iOvfl;
H
Hongze Cheng 已提交
214
  pCell = TDB_PAGE_CELL_AT(pPage, lidx);
215
  szCell = (*pPage->xCellSize)(pPage, pCell, 1, pTxn, pBt);
H
Hongze Cheng 已提交
216 217 218
  tdbPageFree(pPage, lidx, pCell, szCell);
  TDB_PAGE_NCELLS_SET(pPage, nCells - 1);

H
Hongze Cheng 已提交
219 220 221 222 223
  for (; iOvfl < pPage->nOverflow; iOvfl++) {
    pPage->aiOvfl[iOvfl]--;
    ASSERT(pPage->aiOvfl[iOvfl] > 0);
  }

H
Hongze Cheng 已提交
224 225 226
  return 0;
}

H
Hongze Cheng 已提交
227
void tdbPageCopy(SPage *pFromPage, SPage *pToPage) {
H
Hongze Cheng 已提交
228 229
  int delta, nFree;

H
Hongze Cheng 已提交
230 231 232 233 234 235 236
  pToPage->pFreeStart = pToPage->pPageHdr + (pFromPage->pFreeStart - pFromPage->pPageHdr);
  pToPage->pFreeEnd = (u8 *)(pToPage->pPageFtr) - ((u8 *)pFromPage->pPageFtr - pFromPage->pFreeEnd);

  ASSERT(pToPage->pFreeEnd >= pToPage->pFreeStart);

  memcpy(pToPage->pPageHdr, pFromPage->pPageHdr, pFromPage->pFreeStart - pFromPage->pPageHdr);
  memcpy(pToPage->pFreeEnd, pFromPage->pFreeEnd, (u8 *)pFromPage->pPageFtr - pFromPage->pFreeEnd);
H
Hongze Cheng 已提交
237 238 239

  ASSERT(TDB_PAGE_CCELLS(pToPage) == pToPage->pFreeEnd - pToPage->pData);

H
Hongze Cheng 已提交
240
  delta = (pToPage->pPageHdr - pToPage->pData) - (pFromPage->pPageHdr - pFromPage->pData);
H
Hongze Cheng 已提交
241 242 243 244 245
  if (delta != 0) {
    nFree = TDB_PAGE_NFREE(pFromPage);
    TDB_PAGE_NFREE_SET(pToPage, nFree - delta);
  }

H
Hongze Cheng 已提交
246 247 248 249 250 251
  // Copy the overflow cells
  for (int iOvfl = 0; iOvfl < pFromPage->nOverflow; iOvfl++) {
    pToPage->aiOvfl[iOvfl] = pFromPage->aiOvfl[iOvfl];
    pToPage->apOvfl[iOvfl] = pFromPage->apOvfl[iOvfl];
  }
  pToPage->nOverflow = pFromPage->nOverflow;
H
Hongze Cheng 已提交
252 253
}

H
Hongze Cheng 已提交
254 255
int tdbPageCapacity(int pageSize, int amHdrSize) {
  int szPageHdr;
256
  int minCellIndexSize;  // at least one cell in cell index
H
Hongze Cheng 已提交
257 258 259

  if (pageSize < 65536) {
    szPageHdr = pageMethods.szPageHdr;
M
Minglei Jin 已提交
260
    minCellIndexSize = pageMethods.szOffset;
H
Hongze Cheng 已提交
261 262
  } else {
    szPageHdr = pageLargeMethods.szPageHdr;
M
Minglei Jin 已提交
263
    minCellIndexSize = pageLargeMethods.szOffset;
H
Hongze Cheng 已提交
264 265
  }

M
Minglei Jin 已提交
266
  return pageSize - szPageHdr - amHdrSize - sizeof(SPageFtr) - minCellIndexSize;
H
Hongze Cheng 已提交
267 268
}

H
Hongze Cheng 已提交
269 270 271 272 273 274
static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) {
  SCell *pFreeCell;
  u8    *pOffset;
  int    nFree;
  int    ret;
  int    cellFree;
H
Hongze Cheng 已提交
275
  SCell *pCell = NULL;
H
Hongze Cheng 已提交
276 277

  *ppCell = NULL;
H
Hongze Cheng 已提交
278
  nFree = TDB_PAGE_NFREE(pPage);
H
Hongze Cheng 已提交
279

H
Hongze Cheng 已提交
280 281 282 283 284 285
  ASSERT(nFree >= szCell + TDB_PAGE_OFFSET_SIZE(pPage));
  ASSERT(TDB_PAGE_CCELLS(pPage) == pPage->pFreeEnd - pPage->pData);

  // 1. Try to allocate from the free space block area
  if (pPage->pFreeEnd - pPage->pFreeStart >= szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
    pPage->pFreeEnd -= szCell;
H
Hongze Cheng 已提交
286
    pCell = pPage->pFreeEnd;
H
Hongze Cheng 已提交
287
    TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
H
Hongze Cheng 已提交
288
    goto _alloc_finish;
H
Hongze Cheng 已提交
289 290 291
  }

  // 2. Try to allocate from the page free list
H
Hongze Cheng 已提交
292
  cellFree = TDB_PAGE_FCELL(pPage);
H
Hongze Cheng 已提交
293
  ASSERT(cellFree == 0 || cellFree >= pPage->pFreeEnd - pPage->pData);
H
Hongze Cheng 已提交
294 295 296 297 298 299
  if (cellFree && pPage->pFreeEnd - pPage->pFreeStart >= TDB_PAGE_OFFSET_SIZE(pPage)) {
    SCell *pPrevFreeCell = NULL;
    int    szPrevFreeCell;
    int    szFreeCell;
    int    nxFreeCell;
    int    newSize;
H
Hongze Cheng 已提交
300 301

    for (;;) {
H
Hongze Cheng 已提交
302 303 304 305 306 307
      if (cellFree == 0) break;

      pFreeCell = pPage->pData + cellFree;
      pPage->pPageMethods->getFreeCellInfo(pFreeCell, &szFreeCell, &nxFreeCell);

      if (szFreeCell >= szCell) {
H
Hongze Cheng 已提交
308
        pCell = pFreeCell;
H
Hongze Cheng 已提交
309 310 311 312 313 314 315 316 317 318

        newSize = szFreeCell - szCell;
        pFreeCell += szCell;
        if (newSize >= TDB_PAGE_FREE_CELL_SIZE(pPage)) {
          pPage->pPageMethods->setFreeCellInfo(pFreeCell, newSize, nxFreeCell);
          if (pPrevFreeCell) {
            pPage->pPageMethods->setFreeCellInfo(pPrevFreeCell, szPrevFreeCell, pFreeCell - pPage->pData);
          } else {
            TDB_PAGE_FCELL_SET(pPage, pFreeCell - pPage->pData);
          }
H
Hongze Cheng 已提交
319
        } else {
H
Hongze Cheng 已提交
320 321 322 323 324
          if (pPrevFreeCell) {
            pPage->pPageMethods->setFreeCellInfo(pPrevFreeCell, szPrevFreeCell, nxFreeCell);
          } else {
            TDB_PAGE_FCELL_SET(pPage, nxFreeCell);
          }
H
Hongze Cheng 已提交
325
        }
H
Hongze Cheng 已提交
326 327

        goto _alloc_finish;
H
Hongze Cheng 已提交
328
      } else {
H
Hongze Cheng 已提交
329 330 331
        pPrevFreeCell = pFreeCell;
        szPrevFreeCell = szFreeCell;
        cellFree = nxFreeCell;
H
Hongze Cheng 已提交
332 333 334 335
      }
    }
  }

H
Hongze Cheng 已提交
336 337 338 339 340
  // 3. Try to dfragment and allocate again
  tdbPageDefragment(pPage);
  ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
  ASSERT(nFree == TDB_PAGE_NFREE(pPage));
  ASSERT(pPage->pFreeEnd - pPage->pData == TDB_PAGE_CCELLS(pPage));
H
Hongze Cheng 已提交
341

H
Hongze Cheng 已提交
342 343 344
  pPage->pFreeEnd -= szCell;
  pCell = pPage->pFreeEnd;
  TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
H
Hongze Cheng 已提交
345

H
Hongze Cheng 已提交
346 347 348 349
_alloc_finish:
  ASSERT(pCell);
  pPage->pFreeStart += TDB_PAGE_OFFSET_SIZE(pPage);
  TDB_PAGE_NFREE_SET(pPage, nFree - szCell - TDB_PAGE_OFFSET_SIZE(pPage));
H
Hongze Cheng 已提交
350 351 352 353
  *ppCell = pCell;
  return 0;
}

H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int szCell) {
  int nFree;
  int cellFree;
  u8 *dest;
  u8 *src;

  ASSERT(pCell >= pPage->pFreeEnd);
  ASSERT(pCell + szCell <= (u8 *)(pPage->pPageFtr));
  ASSERT(pCell == TDB_PAGE_CELL_AT(pPage, idx));

  nFree = TDB_PAGE_NFREE(pPage);

  if (pCell == pPage->pFreeEnd) {
    pPage->pFreeEnd += szCell;
    TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
  } else {
    if (szCell >= TDB_PAGE_FREE_CELL_SIZE(pPage)) {
      cellFree = TDB_PAGE_FCELL(pPage);
      pPage->pPageMethods->setFreeCellInfo(pCell, szCell, cellFree);
      TDB_PAGE_FCELL_SET(pPage, pCell - pPage->pData);
    } else {
      ASSERT(0);
    }
  }

  dest = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * idx;
  src = dest + TDB_PAGE_OFFSET_SIZE(pPage);
  memmove(dest, src, pPage->pFreeStart - src);

  pPage->pFreeStart -= TDB_PAGE_OFFSET_SIZE(pPage);
  nFree = nFree + szCell + TDB_PAGE_OFFSET_SIZE(pPage);
  TDB_PAGE_NFREE_SET(pPage, nFree);
H
Hongze Cheng 已提交
386 387 388 389
  return 0;
}

static int tdbPageDefragment(SPage *pPage) {
H
Hongze Cheng 已提交
390 391 392 393 394 395 396 397 398 399 400 401
  int    nFree;
  int    nCells;
  SCell *pCell;
  SCell *pNextCell;
  SCell *pTCell;
  int    szCell;
  int    idx;
  int    iCell;

  nFree = TDB_PAGE_NFREE(pPage);
  nCells = TDB_PAGE_NCELLS(pPage);

H
Hongze Cheng 已提交
402 403
  ASSERT(pPage->pFreeEnd - pPage->pFreeStart < nFree);

H
Hongze Cheng 已提交
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
  // Loop to compact the page content
  // Here we use an O(n^2) algorithm to do the job since
  // this is a low frequency job.
  pNextCell = (u8 *)pPage->pPageFtr;
  pCell = NULL;
  for (iCell = 0;; iCell++) {
    // compact over
    if (iCell == nCells) {
      pPage->pFreeEnd = pNextCell;
      break;
    }

    for (int i = 0; i < nCells; i++) {
      if (TDB_PAGE_CELL_OFFSET_AT(pPage, i) < pNextCell - pPage->pData) {
        pTCell = TDB_PAGE_CELL_AT(pPage, i);
        if (pCell == NULL || pCell < pTCell) {
          pCell = pTCell;
          idx = i;
        }
      } else {
        continue;
      }
    }

    ASSERT(pCell != NULL);

430
    szCell = (*pPage->xCellSize)(pPage, pCell, 0, NULL, NULL);
H
Hongze Cheng 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445

    ASSERT(pCell + szCell <= pNextCell);
    if (pCell + szCell < pNextCell) {
      memmove(pNextCell - szCell, pCell, szCell);
    }

    pCell = NULL;
    pNextCell = pNextCell - szCell;
    TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pNextCell - pPage->pData);
  }

  ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
  TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
  TDB_PAGE_FCELL_SET(pPage, 0);

H
Hongze Cheng 已提交
446 447 448 449
  return 0;
}

/* ---------------------------------------------------------------------------------------------------------- */
wafwerar's avatar
wafwerar 已提交
450

H
Hongze Cheng 已提交
451
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
452
typedef struct {
H
Hongze Cheng 已提交
453 454 455 456 457 458
  u16 cellNum;
  u16 cellBody;
  u16 cellFree;
  u16 nFree;
} SPageHdr;

wafwerar's avatar
wafwerar 已提交
459
typedef struct {
H
Hongze Cheng 已提交
460 461 462
  u16 szCell;
  u16 nxOffset;
} SFreeCell;
wafwerar's avatar
wafwerar 已提交
463
#pragma pack(pop)
H
Hongze Cheng 已提交
464

H
Hongze Cheng 已提交
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
// cellNum
static inline int  getPageCellNum(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellNum; }
static inline void setPageCellNum(SPage *pPage, int cellNum) {
  ASSERT(cellNum < 65536);
  ((SPageHdr *)(pPage->pPageHdr))[0].cellNum = (u16)cellNum;
}

// cellBody
static inline int  getPageCellBody(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellBody; }
static inline void setPageCellBody(SPage *pPage, int cellBody) {
  ASSERT(cellBody < 65536);
  ((SPageHdr *)(pPage->pPageHdr))[0].cellBody = (u16)cellBody;
}

// cellFree
static inline int  getPageCellFree(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellFree; }
static inline void setPageCellFree(SPage *pPage, int cellFree) {
  ASSERT(cellFree < 65536);
  ((SPageHdr *)(pPage->pPageHdr))[0].cellFree = (u16)cellFree;
}

// nFree
static inline int  getPageNFree(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].nFree; }
static inline void setPageNFree(SPage *pPage, int nFree) {
H
Hongze Cheng 已提交
489
  ASSERT(nFree < 65536);
H
Hongze Cheng 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503
  ((SPageHdr *)(pPage->pPageHdr))[0].nFree = (u16)nFree;
}

// cell offset
static inline int getPageCellOffset(SPage *pPage, int idx) {
  ASSERT(idx >= 0 && idx < getPageCellNum(pPage));
  return ((u16 *)pPage->pCellIdx)[idx];
}

static inline void setPageCellOffset(SPage *pPage, int idx, int offset) {
  ASSERT(offset < 65536);
  ((u16 *)pPage->pCellIdx)[idx] = (u16)offset;
}

H
Hongze Cheng 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516
// free cell info
static inline void getPageFreeCellInfo(SCell *pCell, int *szCell, int *nxOffset) {
  SFreeCell *pFreeCell = (SFreeCell *)pCell;
  *szCell = pFreeCell->szCell;
  *nxOffset = pFreeCell->nxOffset;
}

static inline void setPageFreeCellInfo(SCell *pCell, int szCell, int nxOffset) {
  SFreeCell *pFreeCell = (SFreeCell *)pCell;
  pFreeCell->szCell = szCell;
  pFreeCell->nxOffset = nxOffset;
}

H
Hongze Cheng 已提交
517
SPageMethods pageMethods = {
H
Hongze Cheng 已提交
518 519 520
    2,                    // szOffset
    sizeof(SPageHdr),     // szPageHdr
    sizeof(SFreeCell),    // szFreeCell
H
Hongze Cheng 已提交
521 522 523 524 525 526 527 528 529 530 531 532
    getPageCellNum,       // getCellNum
    setPageCellNum,       // setCellNum
    getPageCellBody,      // getCellBody
    setPageCellBody,      // setCellBody
    getPageCellFree,      // getCellFree
    setPageCellFree,      // setCellFree
    getPageNFree,         // getFreeBytes
    setPageNFree,         // setFreeBytes
    getPageCellOffset,    // getCellOffset
    setPageCellOffset,    // setCellOffset
    getPageFreeCellInfo,  // getFreeCellInfo
    setPageFreeCellInfo   // setFreeCellInfo
H
Hongze Cheng 已提交
533 534
};

H
Hongze Cheng 已提交
535
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
536
typedef struct {
H
Hongze Cheng 已提交
537 538 539 540 541 542
  u8 cellNum[3];
  u8 cellBody[3];
  u8 cellFree[3];
  u8 nFree[3];
} SPageHdrL;

wafwerar's avatar
wafwerar 已提交
543
typedef struct {
H
Hongze Cheng 已提交
544 545 546
  u8 szCell[3];
  u8 nxOffset[3];
} SFreeCellL;
wafwerar's avatar
wafwerar 已提交
547
#pragma pack(pop)
H
Hongze Cheng 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611

// cellNum
static inline int  getLPageCellNum(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellNum); }
static inline void setLPageCellNum(SPage *pPage, int cellNum) {
  TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellNum, cellNum);
}

// cellBody
static inline int  getLPageCellBody(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellBody); }
static inline void setLPageCellBody(SPage *pPage, int cellBody) {
  TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellBody, cellBody);
}

// cellFree
static inline int  getLPageCellFree(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellFree); }
static inline void setLPageCellFree(SPage *pPage, int cellFree) {
  TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellFree, cellFree);
}

// nFree
static inline int  getLPageNFree(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].nFree); }
static inline void setLPageNFree(SPage *pPage, int nFree) {
  TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].nFree, nFree);
}

// cell offset
static inline int getLPageCellOffset(SPage *pPage, int idx) {
  ASSERT(idx >= 0 && idx < getPageCellNum(pPage));
  return TDB_GET_U24(pPage->pCellIdx + 3 * idx);
}

static inline void setLPageCellOffset(SPage *pPage, int idx, int offset) {
  TDB_PUT_U24(pPage->pCellIdx + 3 * idx, offset);
}

// free cell info
static inline void getLPageFreeCellInfo(SCell *pCell, int *szCell, int *nxOffset) {
  SFreeCellL *pFreeCell = (SFreeCellL *)pCell;
  *szCell = TDB_GET_U24(pFreeCell->szCell);
  *nxOffset = TDB_GET_U24(pFreeCell->nxOffset);
}

static inline void setLPageFreeCellInfo(SCell *pCell, int szCell, int nxOffset) {
  SFreeCellL *pFreeCell = (SFreeCellL *)pCell;
  TDB_PUT_U24(pFreeCell->szCell, szCell);
  TDB_PUT_U24(pFreeCell->nxOffset, nxOffset);
}

SPageMethods pageLargeMethods = {
    3,                     // szOffset
    sizeof(SPageHdrL),     // szPageHdr
    sizeof(SFreeCellL),    // szFreeCell
    getLPageCellNum,       // getCellNum
    setLPageCellNum,       // setCellNum
    getLPageCellBody,      // getCellBody
    setLPageCellBody,      // setCellBody
    getLPageCellFree,      // getCellFree
    setLPageCellFree,      // setCellFree
    getLPageNFree,         // getFreeBytes
    setLPageNFree,         // setFreeBytes
    getLPageCellOffset,    // getCellOffset
    setLPageCellOffset,    // setCellOffset
    getLPageFreeCellInfo,  // getFreeCellInfo
    setLPageFreeCellInfo   // setFreeCellInfo
M
Minglei Jin 已提交
612
};