tdbBtree.c 53.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
refact  
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16 17
#include "tdbInt.h"

H
Hongze Cheng 已提交
18 19
#define TDB_BTREE_ROOT 0x1
#define TDB_BTREE_LEAF 0x2
20
#define TDB_BTREE_OVFL 0x4
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22
struct SBTree {
H
Hongze Cheng 已提交
23 24 25 26 27 28 29 30 31 32
  SPgno         root;
  int           keyLen;
  int           valLen;
  SPager       *pPager;
  tdb_cmpr_fn_t kcmpr;
  int           pageSize;
  int           maxLocal;
  int           minLocal;
  int           maxLeaf;
  int           minLeaf;
M
Minglei Jin 已提交
33 34
  SBtInfo       info;
  char         *tbname;
H
Hongze Cheng 已提交
35
  void         *pBuf;
H
Hongze Cheng 已提交
36 37
};

H
Hongze Cheng 已提交
38 39
#define TDB_BTREE_PAGE_COMMON_HDR u8 flags;

H
Hongze Cheng 已提交
40
#define TDB_BTREE_PAGE_GET_FLAGS(PAGE)        (PAGE)->pData[0]
H
Hongze Cheng 已提交
41
#define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags))
H
Hongze Cheng 已提交
42 43
#define TDB_BTREE_PAGE_IS_ROOT(PAGE)          (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT)
#define TDB_BTREE_PAGE_IS_LEAF(PAGE)          (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF)
44
#define TDB_BTREE_PAGE_IS_OVFL(PAGE)          (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_OVFL)
45 46
#define TDB_BTREE_ASSERT_FLAG(flags)                                                     \
  ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) ||     \
47 48
         TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0) || \
         TDB_FLAG_IS(flags, TDB_BTREE_OVFL))
H
Hongze Cheng 已提交
49

H
Hongze Cheng 已提交
50
#pragma pack(push, 1)
wafwerar's avatar
wafwerar 已提交
51
typedef struct {
H
Hongze Cheng 已提交
52 53 54
  TDB_BTREE_PAGE_COMMON_HDR
} SLeafHdr;

wafwerar's avatar
wafwerar 已提交
55 56
typedef struct {
  TDB_BTREE_PAGE_COMMON_HDR
H
Hongze Cheng 已提交
57 58
  SPgno pgno;  // right-most child
} SIntHdr;
wafwerar's avatar
wafwerar 已提交
59
#pragma pack(pop)
H
more  
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
H
Hongze Cheng 已提交
62
static int tdbBtreeOpenImpl(SBTree *pBt);
63
// static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
H
Hongze Cheng 已提交
64
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
65 66
                              int *szCell, TXN *pTxn, SBTree *pBt);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt);
H
Hongze Cheng 已提交
67
static int tdbBtreeBalance(SBTC *pBtc);
68
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *pTxn, SBTree *pBt);
H
Hongze Cheng 已提交
69
static int tdbBtcMoveDownward(SBTC *pBtc);
H
Hongze Cheng 已提交
70
static int tdbBtcMoveUpward(SBTC *pBtc);
H
Hongze Cheng 已提交
71

72 73
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr,
                 SBTree **ppBt) {
H
Hongze Cheng 已提交
74
  SBTree *pBt;
H
Hongze Cheng 已提交
75
  int     ret;
H
more  
Hongze Cheng 已提交
76

H
Hongze Cheng 已提交
77 78
  ASSERT(keyLen != 0);

H
Hongze Cheng 已提交
79
  *ppBt = NULL;
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81
  pBt = (SBTree *)tdbOsCalloc(1, sizeof(*pBt));
H
Hongze Cheng 已提交
82 83 84 85 86
  if (pBt == NULL) {
    return -1;
  }

  // pBt->keyLen
H
Hongze Cheng 已提交
87
  pBt->keyLen = keyLen < 0 ? TDB_VARIANT_LEN : keyLen;
H
Hongze Cheng 已提交
88
  // pBt->valLen
H
Hongze Cheng 已提交
89
  pBt->valLen = valLen < 0 ? TDB_VARIANT_LEN : valLen;
H
refact  
Hongze Cheng 已提交
90 91
  // pBt->pPager
  pBt->pPager = pPager;
H
Hongze Cheng 已提交
92 93
  // pBt->kcmpr
  pBt->kcmpr = kcmpr ? kcmpr : tdbDefaultKeyCmprFn;
H
Hongze Cheng 已提交
94
  // pBt->pageSize
H
Hongze Cheng 已提交
95
  pBt->pageSize = pPager->pageSize;
H
Hongze Cheng 已提交
96
  // pBt->maxLocal
H
Hongze Cheng 已提交
97
  pBt->maxLocal = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr)) / 4;
H
Hongze Cheng 已提交
98
  // pBt->minLocal: Should not be allowed smaller than 15, which is [nPayload][nKey][nData]
H
Hongze Cheng 已提交
99
  pBt->minLocal = pBt->maxLocal / 2;
H
Hongze Cheng 已提交
100
  // pBt->maxLeaf
H
Hongze Cheng 已提交
101
  pBt->maxLeaf = tdbPageCapacity(pBt->pageSize, sizeof(SLeafHdr));
H
Hongze Cheng 已提交
102 103
  // pBt->minLeaf
  pBt->minLeaf = pBt->minLocal;
H
Hongze Cheng 已提交
104

105 106 107 108 109
  // if pgno == 0 fetch new btree root leaf page
  if (pgno == 0) {
    // fetch page & insert into main db
    SPage *pPage;
    TXN    txn;
110
    tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125

    pPager->inTran = 1;

    SBtreeInitPageArg zArg;
    zArg.flags = 0x1 | 0x2;  // root leaf node;
    zArg.pBt = pBt;
    ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
    if (ret < 0) {
      return -1;
    }

    ret = tdbPagerWrite(pPager, pPage);
    if (ret < 0) {
      return -1;
    }
M
Minglei Jin 已提交
126

127
    if (strcmp(TDB_MAINDB_NAME, tbname)) {
M
Minglei Jin 已提交
128 129 130 131 132 133
      pBt->info.root = pgno;
      pBt->info.nLevel = 1;
      pBt->info.nData = 0;
      pBt->tbname = (char *)tbname;
      // ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pgno, sizeof(SPgno), &txn);
      ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pBt->info, sizeof(pBt->info), &txn);
134 135 136 137
      if (ret < 0) {
        return -1;
      }
    }
M
Minglei Jin 已提交
138

M
Minglei Jin 已提交
139 140
    // tdbUnrefPage(pPage);
    tdbPCacheRelease(pPager->pCache, pPage, &txn);
M
Minglei Jin 已提交
141
    tdbCommit(pPager->pEnv, &txn);
142 143 144 145 146 147
    tdbTxnClose(&txn);
  }

  ASSERT(pgno != 0);
  pBt->root = pgno;
  /*
H
Hongze Cheng 已提交
148 149 150
  // TODO: pBt->root
  ret = tdbBtreeOpenImpl(pBt);
  if (ret < 0) {
H
Hongze Cheng 已提交
151
    tdbOsFree(pBt);
H
Hongze Cheng 已提交
152 153
    return -1;
  }
154
  */
H
Hongze Cheng 已提交
155
  *ppBt = pBt;
H
Hongze Cheng 已提交
156 157 158 159
  return 0;
}

int tdbBtreeClose(SBTree *pBt) {
H
Hongze Cheng 已提交
160
  if (pBt) {
H
Hongze Cheng 已提交
161
    tdbFree(pBt->pBuf);
H
Hongze Cheng 已提交
162 163
    tdbOsFree(pBt);
  }
H
Hongze Cheng 已提交
164 165 166
  return 0;
}

H
Hongze Cheng 已提交
167
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
H
Hongze Cheng 已提交
168 169 170 171 172 173 174 175 176
  SBTC   btc;
  SCell *pCell;
  void  *pBuf;
  int    szCell;
  int    szBuf;
  int    ret;
  int    idx;
  int    c;

H
Hongze Cheng 已提交
177
  tdbBtcOpen(&btc, pBt, pTxn);
H
Hongze Cheng 已提交
178

H
Hongze Cheng 已提交
179 180
  // move to the position to insert
  ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
H
Hongze Cheng 已提交
181
  if (ret < 0) {
H
Hongze Cheng 已提交
182 183
    tdbBtcClose(&btc);
    ASSERT(0);
H
Hongze Cheng 已提交
184 185 186
    return -1;
  }

H
Hongze Cheng 已提交
187
  if (btc.idx == -1) {
H
Hongze Cheng 已提交
188
    btc.idx = 0;
H
Hongze Cheng 已提交
189
  } else {
H
Hongze Cheng 已提交
190
    if (c > 0) {
H
Hongze Cheng 已提交
191 192 193
      btc.idx++;
    } else if (c == 0) {
      // dup key not allowed
H
Hongze Cheng 已提交
194
      ASSERT(0);
H
Hongze Cheng 已提交
195 196 197 198
      return -1;
    }
  }

H
Hongze Cheng 已提交
199 200
  ret = tdbBtcUpsert(&btc, pKey, kLen, pVal, vLen, 1);
  if (ret < 0) {
H
Hongze Cheng 已提交
201
    ASSERT(0);
H
Hongze Cheng 已提交
202
    tdbBtcClose(&btc);
H
Hongze Cheng 已提交
203 204
    return -1;
  }
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218
  tdbBtcClose(&btc);
  return 0;
}

int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
  SBTC btc;
  int  c;
  int  ret;

  tdbBtcOpen(&btc, pBt, pTxn);

  // move the cursor
  ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
H
Hongze Cheng 已提交
219
  if (ret < 0) {
H
Hongze Cheng 已提交
220 221
    tdbBtcClose(&btc);
    ASSERT(0);
H
more  
Hongze Cheng 已提交
222 223 224
    return -1;
  }

H
Hongze Cheng 已提交
225
  if (btc.idx < 0 || c != 0) {
H
more  
Hongze Cheng 已提交
226
    tdbBtcClose(&btc);
H
Hongze Cheng 已提交
227 228
    return -1;
  }
H
Hongze Cheng 已提交
229

H
Hongze Cheng 已提交
230 231
  // delete the key
  if (tdbBtcDelete(&btc) < 0) {
H
Hongze Cheng 已提交
232
    tdbBtcClose(&btc);
H
Hongze Cheng 已提交
233
    return -1;
H
Hongze Cheng 已提交
234 235
  }

H
Hongze Cheng 已提交
236
  tdbBtcClose(&btc);
H
more  
Hongze Cheng 已提交
237 238 239
  return 0;
}

H
Hongze Cheng 已提交
240
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn) {
H
Hongze Cheng 已提交
241 242 243 244 245 246 247
  SBTC btc;
  int  c;
  int  ret;

  tdbBtcOpen(&btc, pBt, pTxn);

  // move the cursor
H
Hongze Cheng 已提交
248
  ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
H
Hongze Cheng 已提交
249 250
  if (ret < 0) {
    ASSERT(0);
H
Hongze Cheng 已提交
251
    tdbBtcClose(&btc);
H
Hongze Cheng 已提交
252 253 254
    return -1;
  }

H
Hongze Cheng 已提交
255 256 257 258 259 260 261
  if (btc.idx == -1) {
    btc.idx = 0;
    c = 1;
  } else {
    if (c > 0) {
      btc.idx = btc.idx + 1;
    }
H
Hongze Cheng 已提交
262 263
  }

H
Hongze Cheng 已提交
264 265 266
  ret = tdbBtcUpsert(&btc, pKey, nKey, pData, nData, c);
  if (ret < 0) {
    ASSERT(0);
H
Hongze Cheng 已提交
267 268 269 270 271 272 273 274
    tdbBtcClose(&btc);
    return -1;
  }

  tdbBtcClose(&btc);
  return 0;
}

H
Hongze Cheng 已提交
275
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen) {
H
Hongze Cheng 已提交
276 277 278 279
  return tdbBtreePGet(pBt, pKey, kLen, NULL, NULL, ppVal, vLen);
}

int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
H
refact  
Hongze Cheng 已提交
280
  SBTC         btc;
H
Hongze Cheng 已提交
281 282
  SCell       *pCell;
  int          cret;
H
Hongze Cheng 已提交
283
  int          ret;
H
Hongze Cheng 已提交
284 285
  void        *pTKey = NULL;
  void        *pTVal = NULL;
H
Hongze Cheng 已提交
286 287
  SCellDecoder cd;

H
Hongze Cheng 已提交
288
  tdbBtcOpen(&btc, pBt, NULL);
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291 292 293 294
  ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret);
  if (ret < 0) {
    tdbBtcClose(&btc);
    ASSERT(0);
  }
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296
  if (btc.idx < 0 || cret) {
H
Hongze Cheng 已提交
297 298
    tdbBtcClose(&btc);
    return -1;
H
Hongze Cheng 已提交
299 300 301
  }

  pCell = tdbPageGetCell(btc.pPage, btc.idx);
302
  tdbBtreeDecodeCell(btc.pPage, pCell, &cd, btc.pTxn, pBt);
H
Hongze Cheng 已提交
303

H
Hongze Cheng 已提交
304
  if (ppKey) {
H
Hongze Cheng 已提交
305
    pTKey = tdbRealloc(*ppKey, cd.kLen);
H
Hongze Cheng 已提交
306 307 308 309 310 311 312 313
    if (pTKey == NULL) {
      tdbBtcClose(&btc);
      ASSERT(0);
      return -1;
    }
    *ppKey = pTKey;
    *pkLen = cd.kLen;
    memcpy(*ppKey, cd.pKey, cd.kLen);
314 315
  }

H
Hongze Cheng 已提交
316
  if (ppVal) {
H
Hongze Cheng 已提交
317
    pTVal = tdbRealloc(*ppVal, cd.vLen);
H
Hongze Cheng 已提交
318 319 320 321 322 323 324 325
    if (pTVal == NULL) {
      tdbBtcClose(&btc);
      ASSERT(0);
      return -1;
    }
    *ppVal = pTVal;
    *vLen = cd.vLen;
    memcpy(*ppVal, cd.pVal, cd.vLen);
326
  }
H
Hongze Cheng 已提交
327

328 329 330 331 332 333 334 335
  if (TDB_CELLDECODER_FREE_KEY(&cd)) {
    tdbFree(cd.pKey);
  }

  if (TDB_CELLDECODER_FREE_VAL(&cd)) {
    tdbFree(cd.pVal);
  }

H
Hongze Cheng 已提交
336 337
  tdbBtcClose(&btc);

338 339 340
  return 0;
}

H
Hongze Cheng 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
  int mlen;
  int cret;

  ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);

  mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
  cret = memcmp(pKey1, pKey2, mlen);
  if (cret == 0) {
    if (keyLen1 < keyLen2) {
      cret = -1;
    } else if (keyLen1 > keyLen2) {
      cret = 1;
    } else {
      cret = 0;
    }
  }
  return cret;
H
Hongze Cheng 已提交
359 360 361 362 363 364 365 366 367
}

static int tdbBtreeOpenImpl(SBTree *pBt) {
  // Try to get the root page of the an existing btree
  SPgno  pgno;
  SPage *pPage;
  int    ret;

  {
H
Hongze Cheng 已提交
368
    // 1. TODO: Search the main DB to check if the DB exists
369
    ret = tdbPagerOpenDB(pBt->pPager, &pgno, true, pBt);
H
Hongze Cheng 已提交
370
    ASSERT(ret == 0);
H
Hongze Cheng 已提交
371 372 373 374 375 376 377 378
  }

  if (pgno != 0) {
    pBt->root = pgno;
    return 0;
  }

  // Try to create a new database
H
Hongze Cheng 已提交
379
  ret = tdbPagerAllocPage(pBt->pPager, &pgno);
H
Hongze Cheng 已提交
380
  if (ret < 0) {
H
Hongze Cheng 已提交
381
    ASSERT(0);
H
Hongze Cheng 已提交
382 383 384 385 386
    return -1;
  }

  ASSERT(pgno != 0);
  pBt->root = pgno;
H
Hongze Cheng 已提交
387 388 389
  return 0;
}

390
int tdbBtreeInitPage(SPage *pPage, void *arg, int init) {
H
Hongze Cheng 已提交
391
  SBTree *pBt;
H
Hongze Cheng 已提交
392
  u8      flags;
H
Hongze Cheng 已提交
393
  u8      leaf;
H
Hongze Cheng 已提交
394

H
Hongze Cheng 已提交
395
  pBt = ((SBtreeInitPageArg *)arg)->pBt;
H
Hongze Cheng 已提交
396

H
Hongze Cheng 已提交
397 398 399 400 401
  if (init) {
    // init page
    flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
    leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
    TDB_BTREE_ASSERT_FLAG(flags);
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403
    tdbPageInit(pPage, leaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
H
Hongze Cheng 已提交
404
  } else {
H
Hongze Cheng 已提交
405 406 407 408
    // zero page
    flags = ((SBtreeInitPageArg *)arg)->flags;
    leaf = flags & TDB_BTREE_LEAF;
    TDB_BTREE_ASSERT_FLAG(flags);
H
Hongze Cheng 已提交
409

H
Hongze Cheng 已提交
410
    tdbPageZero(pPage, leaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412 413 414
    if (leaf) {
      SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pData);
      pLeafHdr->flags = flags;
H
Hongze Cheng 已提交
415

H
Hongze Cheng 已提交
416 417 418 419 420 421
    } else {
      SIntHdr *pIntHdr = (SIntHdr *)(pPage->pData);
      pIntHdr->flags = flags;
      pIntHdr->pgno = 0;
    }
  }
H
Hongze Cheng 已提交
422

H
refact  
Hongze Cheng 已提交
423
  if (leaf) {
H
Hongze Cheng 已提交
424 425 426 427
    pPage->kLen = pBt->keyLen;
    pPage->vLen = pBt->valLen;
    pPage->maxLocal = pBt->maxLeaf;
    pPage->minLocal = pBt->minLeaf;
428 429 430 431 432
  } else if (TDB_BTREE_PAGE_IS_OVFL(pPage)) {
    pPage->kLen = pBt->keyLen;
    pPage->vLen = pBt->valLen;
    pPage->maxLocal = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr));
    pPage->minLocal = pBt->minLocal;
H
Hongze Cheng 已提交
433 434 435 436 437 438
  } else {
    pPage->kLen = pBt->keyLen;
    pPage->vLen = sizeof(SPgno);
    pPage->maxLocal = pBt->maxLocal;
    pPage->minLocal = pBt->minLocal;
  }
H
Hongze Cheng 已提交
439

H
Hongze Cheng 已提交
440
  return 0;
H
Hongze Cheng 已提交
441 442
}

H
Hongze Cheng 已提交
443
// TDB_BTREE_BALANCE =====================
H
Hongze Cheng 已提交
444
static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN *pTxn) {
H
Hongze Cheng 已提交
445 446 447 448
  SPager           *pPager;
  SPage            *pChild;
  SPgno             pgnoChild;
  int               ret;
H
Hongze Cheng 已提交
449
  u8                flags;
H
Hongze Cheng 已提交
450
  SIntHdr          *pIntHdr;
H
Hongze Cheng 已提交
451
  SBtreeInitPageArg zArg;
H
Hongze Cheng 已提交
452
  u8                leaf;
H
Hongze Cheng 已提交
453 454

  pPager = pRoot->pPager;
H
Hongze Cheng 已提交
455
  flags = TDB_BTREE_PAGE_GET_FLAGS(pRoot);
H
refact  
Hongze Cheng 已提交
456
  leaf = TDB_BTREE_PAGE_IS_LEAF(pRoot);
H
Hongze Cheng 已提交
457

H
Hongze Cheng 已提交
458 459
  // allocate a new child page
  pgnoChild = 0;
H
Hongze Cheng 已提交
460
  zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT);
H
Hongze Cheng 已提交
461
  zArg.pBt = pBt;
H
Hongze Cheng 已提交
462
  ret = tdbPagerFetchPage(pPager, &pgnoChild, &pChild, tdbBtreeInitPage, &zArg, pTxn);
H
Hongze Cheng 已提交
463 464 465 466
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
467 468 469 470
  if (!leaf) {
    ((SIntHdr *)pChild->pData)->pgno = ((SIntHdr *)(pRoot->pData))->pgno;
  }

H
Hongze Cheng 已提交
471 472 473 474 475 476 477
  ret = tdbPagerWrite(pPager, pChild);
  if (ret < 0) {
    // TODO
    ASSERT(0);
    return 0;
  }

H
Hongze Cheng 已提交
478
  // Copy the root page content to the child page
H
Hongze Cheng 已提交
479
  tdbPageCopy(pRoot, pChild);
H
Hongze Cheng 已提交
480 481 482 483

  // Reinitialize the root page
  zArg.flags = TDB_BTREE_ROOT;
  zArg.pBt = pBt;
H
Hongze Cheng 已提交
484
  ret = tdbBtreeInitPage(pRoot, &zArg, 0);
H
Hongze Cheng 已提交
485 486 487 488
  if (ret < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
489
  pIntHdr = (SIntHdr *)(pRoot->pData);
H
Hongze Cheng 已提交
490
  pIntHdr->pgno = pgnoChild;
H
Hongze Cheng 已提交
491

H
Hongze Cheng 已提交
492 493 494 495
  *ppChild = pChild;
  return 0;
}

H
Hongze Cheng 已提交
496
static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTxn) {
H
Hongze Cheng 已提交
497 498 499
  int ret;

  int    nOlds;
H
Hongze Cheng 已提交
500
  SPage *pOlds[3] = {0};
H
Hongze Cheng 已提交
501 502
  SCell *pDivCell[3] = {0};
  int    szDivCell[3];
H
Hongze Cheng 已提交
503
  int    sIdx;
H
Hongze Cheng 已提交
504
  u8     childNotLeaf;
H
Hongze Cheng 已提交
505
  SPgno  rPgno;
H
Hongze Cheng 已提交
506

H
Hongze Cheng 已提交
507
  {  // Find 3 child pages at most to do balance
H
Hongze Cheng 已提交
508 509 510
    int    nCells = TDB_PAGE_TOTAL_CELLS(pParent);
    SCell *pCell;

H
Hongze Cheng 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524
    if (nCells <= 2) {
      sIdx = 0;
      nOlds = nCells + 1;
    } else {
      // has more than three child pages
      if (idx == 0) {
        sIdx = 0;
      } else if (idx == nCells) {
        sIdx = idx - 2;
      } else {
        sIdx = idx - 1;
      }
      nOlds = 3;
    }
H
Hongze Cheng 已提交
525 526
    for (int i = 0; i < nOlds; i++) {
      ASSERT(sIdx + i <= nCells);
H
Hongze Cheng 已提交
527

H
Hongze Cheng 已提交
528
      SPgno pgno;
H
Hongze Cheng 已提交
529
      if (sIdx + i == nCells) {
H
refact  
Hongze Cheng 已提交
530
        ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pParent));
H
Hongze Cheng 已提交
531 532
        pgno = ((SIntHdr *)(pParent->pData))->pgno;
      } else {
H
Hongze Cheng 已提交
533
        pCell = tdbPageGetCell(pParent, sIdx + i);
H
Hongze Cheng 已提交
534 535 536
        pgno = *(SPgno *)pCell;
      }

H
Hongze Cheng 已提交
537 538
      ret = tdbPagerFetchPage(pBt->pPager, &pgno, pOlds + i, tdbBtreeInitPage,
                              &((SBtreeInitPageArg){.pBt = pBt, .flags = 0}), pTxn);
H
Hongze Cheng 已提交
539 540 541 542
      if (ret < 0) {
        ASSERT(0);
        return -1;
      }
H
Hongze Cheng 已提交
543 544 545 546 547 548 549

      ret = tdbPagerWrite(pBt->pPager, pOlds[i]);
      if (ret < 0) {
        // TODO
        ASSERT(0);
        return -1;
      }
H
Hongze Cheng 已提交
550
    }
H
Hongze Cheng 已提交
551
    // copy the parent key out if child pages are not leaf page
H
refact  
Hongze Cheng 已提交
552
    childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]);
H
Hongze Cheng 已提交
553
    if (childNotLeaf) {
H
Hongze Cheng 已提交
554 555 556
      for (int i = 0; i < nOlds; i++) {
        if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) {
          pCell = tdbPageGetCell(pParent, sIdx + i);
557
          szDivCell[i] = tdbBtreeCellSize(pParent, pCell, 0, NULL, NULL);
H
Hongze Cheng 已提交
558
          pDivCell[i] = tdbOsMalloc(szDivCell[i]);
H
Hongze Cheng 已提交
559 560
          memcpy(pDivCell[i], pCell, szDivCell[i]);
        }
H
Hongze Cheng 已提交
561

H
Hongze Cheng 已提交
562 563 564
        if (i < nOlds - 1) {
          ((SPgno *)pDivCell[i])[0] = ((SIntHdr *)pOlds[i]->pData)->pgno;
          ((SIntHdr *)pOlds[i]->pData)->pgno = 0;
H
Hongze Cheng 已提交
565
          tdbPageInsertCell(pOlds[i], TDB_PAGE_TOTAL_CELLS(pOlds[i]), pDivCell[i], szDivCell[i], 1);
H
Hongze Cheng 已提交
566
        }
H
Hongze Cheng 已提交
567
      }
H
Hongze Cheng 已提交
568
      rPgno = ((SIntHdr *)pOlds[nOlds - 1]->pData)->pgno;
H
Hongze Cheng 已提交
569
    }
H
Hongze Cheng 已提交
570 571 572 573 574 575 576 577

    ret = tdbPagerWrite(pBt->pPager, pParent);
    if (ret < 0) {
      // TODO
      ASSERT(0);
      return -1;
    }

H
Hongze Cheng 已提交
578
    // drop the cells on parent page
H
Hongze Cheng 已提交
579 580 581
    for (int i = 0; i < nOlds; i++) {
      nCells = TDB_PAGE_TOTAL_CELLS(pParent);
      if (sIdx < nCells) {
582
        tdbPageDropCell(pParent, sIdx, pTxn, pBt);
H
Hongze Cheng 已提交
583 584 585 586
      } else {
        ((SIntHdr *)pParent->pData)->pgno = 0;
      }
    }
H
Hongze Cheng 已提交
587 588
  }

H
Hongze Cheng 已提交
589
  int nNews = 0;
H
Hongze Cheng 已提交
590
  struct {
H
Hongze Cheng 已提交
591 592 593 594
    int cnt;
    int size;
    int iPage;
    int oIdx;
H
Hongze Cheng 已提交
595
  } infoNews[5] = {0};
H
Hongze Cheng 已提交
596 597 598

  {  // Get how many new pages are needed and the new distribution

H
Hongze Cheng 已提交
599 600 601
    // first loop to find minimum number of pages needed
    for (int oPage = 0; oPage < nOlds; oPage++) {
      SPage *pPage = pOlds[oPage];
H
Hongze Cheng 已提交
602 603
      SCell *pCell;
      int    cellBytes;
H
Hongze Cheng 已提交
604
      int    oIdx;
H
Hongze Cheng 已提交
605

H
Hongze Cheng 已提交
606
      for (oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) {
H
Hongze Cheng 已提交
607
        pCell = tdbPageGetCell(pPage, oIdx);
H
Hongze Cheng 已提交
608 609
        cellBytes = TDB_BYTES_CELL_TAKEN(pPage, pCell);

H
Hongze Cheng 已提交
610 611
        if (infoNews[nNews].size + cellBytes > TDB_PAGE_USABLE_SIZE(pPage)) {
          // page is full, use a new page
H
Hongze Cheng 已提交
612
          nNews++;
H
Hongze Cheng 已提交
613

H
Hongze Cheng 已提交
614 615 616 617 618 619 620
          ASSERT(infoNews[nNews].size + cellBytes <= TDB_PAGE_USABLE_SIZE(pPage));

          if (childNotLeaf) {
            // for non-child page, this cell is used as the right-most child,
            // the divider cell to parent as well
            continue;
          }
H
Hongze Cheng 已提交
621
        }
H
Hongze Cheng 已提交
622 623
        infoNews[nNews].cnt++;
        infoNews[nNews].size += cellBytes;
H
Hongze Cheng 已提交
624
        infoNews[nNews].iPage = oPage;
H
Hongze Cheng 已提交
625
        infoNews[nNews].oIdx = oIdx;
H
Hongze Cheng 已提交
626 627 628
      }
    }

H
Hongze Cheng 已提交
629
    nNews++;
H
Hongze Cheng 已提交
630 631 632

    // back loop to make the distribution even
    for (int iNew = nNews - 1; iNew > 0; iNew--) {
H
Hongze Cheng 已提交
633
      SCell *pCell;
H
Hongze Cheng 已提交
634 635
      int    szLCell, szRCell;

H
Hongze Cheng 已提交
636
      // balance page (iNew) and (iNew-1)
H
Hongze Cheng 已提交
637
      for (;;) {
H
Hongze Cheng 已提交
638
        pCell = tdbPageGetCell(pOlds[infoNews[iNew - 1].iPage], infoNews[iNew - 1].oIdx);
H
Hongze Cheng 已提交
639

640
        szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell, 0, NULL, NULL);
H
Hongze Cheng 已提交
641 642
        if (!childNotLeaf) {
          szRCell = szLCell;
H
Hongze Cheng 已提交
643
        } else {
H
Hongze Cheng 已提交
644
          int    iPage = infoNews[iNew - 1].iPage;
H
Hongze Cheng 已提交
645
          int    oIdx = infoNews[iNew - 1].oIdx + 1;
H
Hongze Cheng 已提交
646
          SPage *pPage;
H
Hongze Cheng 已提交
647
          for (;;) {
H
Hongze Cheng 已提交
648
            pPage = pOlds[iPage];
H
Hongze Cheng 已提交
649 650 651 652
            if (oIdx < TDB_PAGE_TOTAL_CELLS(pPage)) {
              break;
            }

H
Hongze Cheng 已提交
653
            iPage++;
H
Hongze Cheng 已提交
654 655 656 657
            oIdx = 0;
          }

          pCell = tdbPageGetCell(pPage, oIdx);
658
          szRCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL);
H
Hongze Cheng 已提交
659 660 661 662 663 664 665 666
        }

        ASSERT(infoNews[iNew - 1].cnt > 0);

        if (infoNews[iNew].size + szRCell >= infoNews[iNew - 1].size - szRCell) {
          break;
        }

H
Hongze Cheng 已提交
667
        // Move a cell right forward
H
Hongze Cheng 已提交
668 669
        infoNews[iNew - 1].cnt--;
        infoNews[iNew - 1].size -= szLCell;
H
Hongze Cheng 已提交
670
        infoNews[iNew - 1].oIdx--;
H
Hongze Cheng 已提交
671
        for (;;) {
H
Hongze Cheng 已提交
672
          if (infoNews[iNew - 1].oIdx >= 0) {
H
Hongze Cheng 已提交
673 674
            break;
          }
H
Hongze Cheng 已提交
675

H
Hongze Cheng 已提交
676 677
          infoNews[iNew - 1].iPage--;
          infoNews[iNew - 1].oIdx = TDB_PAGE_TOTAL_CELLS(pOlds[infoNews[iNew - 1].iPage]) - 1;
H
Hongze Cheng 已提交
678 679 680
        }

        infoNews[iNew].cnt++;
H
Hongze Cheng 已提交
681
        infoNews[iNew].size += szRCell;
H
Hongze Cheng 已提交
682
      }
H
Hongze Cheng 已提交
683
    }
H
Hongze Cheng 已提交
684 685
  }

H
Hongze Cheng 已提交
686 687 688 689 690 691 692 693 694 695 696 697 698
  SPage *pNews[5] = {0};
  {  // Allocate new pages, reuse the old page when possible

    SPgno             pgno;
    SBtreeInitPageArg iarg;
    u8                flags;

    flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]);

    for (int iNew = 0; iNew < nNews; iNew++) {
      if (iNew < nOlds) {
        pNews[iNew] = pOlds[iNew];
      } else {
H
Hongze Cheng 已提交
699
        pgno = 0;
H
Hongze Cheng 已提交
700 701
        iarg.pBt = pBt;
        iarg.flags = flags;
H
Hongze Cheng 已提交
702
        ret = tdbPagerFetchPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeInitPage, &iarg, pTxn);
H
Hongze Cheng 已提交
703 704 705
        if (ret < 0) {
          ASSERT(0);
        }
H
Hongze Cheng 已提交
706 707 708 709 710 711 712

        ret = tdbPagerWrite(pBt->pPager, pNews[iNew]);
        if (ret < 0) {
          // TODO
          ASSERT(0);
          return -1;
        }
H
Hongze Cheng 已提交
713 714
      }
    }
H
Hongze Cheng 已提交
715 716

    // TODO: sort the page according to the page number
H
Hongze Cheng 已提交
717 718
  }

H
Hongze Cheng 已提交
719
  {  // Do the real cell distribution
H
Hongze Cheng 已提交
720
    SPage            *pOldsCopy[3] = {0};
H
Hongze Cheng 已提交
721 722 723 724
    SCell            *pCell;
    int               szCell;
    SBtreeInitPageArg iarg;
    int               iNew, nNewCells;
H
Hongze Cheng 已提交
725
    SCellDecoder      cd;
H
Hongze Cheng 已提交
726 727 728 729

    iarg.pBt = pBt;
    iarg.flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]);
    for (int i = 0; i < nOlds; i++) {
H
Hongze Cheng 已提交
730 731
      tdbPageCreate(pOlds[0]->pageSize, &pOldsCopy[i], tdbDefaultMalloc, NULL);
      tdbBtreeInitPage(pOldsCopy[i], &iarg, 0);
H
Hongze Cheng 已提交
732 733 734 735
      tdbPageCopy(pOlds[i], pOldsCopy[i]);
    }
    iNew = 0;
    nNewCells = 0;
H
Hongze Cheng 已提交
736
    tdbBtreeInitPage(pNews[iNew], &iarg, 0);
H
Hongze Cheng 已提交
737 738 739 740 741 742 743 744

    for (int iOld = 0; iOld < nOlds; iOld++) {
      SPage *pPage;

      pPage = pOldsCopy[iOld];

      for (int oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) {
        pCell = tdbPageGetCell(pPage, oIdx);
745
        szCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL);
H
Hongze Cheng 已提交
746 747

        ASSERT(nNewCells <= infoNews[iNew].cnt);
H
Hongze Cheng 已提交
748
        ASSERT(iNew < nNews);
H
Hongze Cheng 已提交
749 750 751 752

        if (nNewCells < infoNews[iNew].cnt) {
          tdbPageInsertCell(pNews[iNew], nNewCells, pCell, szCell, 0);
          nNewCells++;
H
Hongze Cheng 已提交
753

H
Hongze Cheng 已提交
754 755 756 757 758 759 760
          // insert parent page
          if (!childNotLeaf && nNewCells == infoNews[iNew].cnt) {
            SIntHdr *pIntHdr = (SIntHdr *)pParent->pData;

            if (iNew == nNews - 1 && pIntHdr->pgno == 0) {
              pIntHdr->pgno = TDB_PAGE_PGNO(pNews[iNew]);
            } else {
761
              tdbBtreeDecodeCell(pPage, pCell, &cd, pTxn, pBt);
H
Hongze Cheng 已提交
762

H
Hongze Cheng 已提交
763
              // TODO: pCell here may be inserted as an overflow cell, handle it
H
Hongze Cheng 已提交
764
              SCell *pNewCell = tdbOsMalloc(cd.kLen + 9);
H
Hongze Cheng 已提交
765 766 767
              int    szNewCell;
              SPgno  pgno;
              pgno = TDB_PAGE_PGNO(pNews[iNew]);
768 769
              tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell, pTxn,
                                 pBt);
H
Hongze Cheng 已提交
770
              tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
H
Hongze Cheng 已提交
771
              tdbOsFree(pNewCell);
H
Hongze Cheng 已提交
772
            }
H
Hongze Cheng 已提交
773 774

            // move to next new page
H
Hongze Cheng 已提交
775
            iNew++;
H
Hongze Cheng 已提交
776 777
            nNewCells = 0;
            if (iNew < nNews) {
H
Hongze Cheng 已提交
778
              tdbBtreeInitPage(pNews[iNew], &iarg, 0);
H
Hongze Cheng 已提交
779
            }
H
Hongze Cheng 已提交
780 781 782
          }
        } else {
          ASSERT(childNotLeaf);
H
Hongze Cheng 已提交
783
          ASSERT(iNew < nNews - 1);
H
Hongze Cheng 已提交
784

H
Hongze Cheng 已提交
785 786 787 788
          // set current new page right-most child
          ((SIntHdr *)pNews[iNew]->pData)->pgno = ((SPgno *)pCell)[0];

          // insert to parent as divider cell
H
Hongze Cheng 已提交
789 790 791
          ASSERT(iNew < nNews - 1);
          ((SPgno *)pCell)[0] = TDB_PAGE_PGNO(pNews[iNew]);
          tdbPageInsertCell(pParent, sIdx++, pCell, szCell, 0);
H
Hongze Cheng 已提交
792 793 794 795

          // move to next new page
          iNew++;
          nNewCells = 0;
H
Hongze Cheng 已提交
796
          if (iNew < nNews) {
H
Hongze Cheng 已提交
797
            tdbBtreeInitPage(pNews[iNew], &iarg, 0);
H
Hongze Cheng 已提交
798
          }
H
Hongze Cheng 已提交
799 800
        }
      }
H
Hongze Cheng 已提交
801
    }
H
Hongze Cheng 已提交
802

H
Hongze Cheng 已提交
803 804 805
    if (childNotLeaf) {
      ASSERT(TDB_PAGE_TOTAL_CELLS(pNews[nNews - 1]) == infoNews[nNews - 1].cnt);
      ((SIntHdr *)(pNews[nNews - 1]->pData))->pgno = rPgno;
H
Hongze Cheng 已提交
806

H
Hongze Cheng 已提交
807 808 809 810 811 812
      SIntHdr *pIntHdr = (SIntHdr *)pParent->pData;
      if (pIntHdr->pgno == 0) {
        pIntHdr->pgno = TDB_PAGE_PGNO(pNews[nNews - 1]);
      } else {
        ((SPgno *)pDivCell[nOlds - 1])[0] = TDB_PAGE_PGNO(pNews[nNews - 1]);
        tdbPageInsertCell(pParent, sIdx, pDivCell[nOlds - 1], szDivCell[nOlds - 1], 0);
H
Hongze Cheng 已提交
813
      }
H
Hongze Cheng 已提交
814 815 816
    }

    for (int i = 0; i < nOlds; i++) {
H
Hongze Cheng 已提交
817
      tdbPageDestroy(pOldsCopy[i], tdbDefaultFree, NULL);
H
Hongze Cheng 已提交
818 819 820
    }
  }

H
Hongze Cheng 已提交
821 822 823 824 825 826 827
  if (TDB_BTREE_PAGE_IS_ROOT(pParent) && TDB_PAGE_TOTAL_CELLS(pParent) == 0) {
    i8 flags = TDB_BTREE_ROOT | TDB_BTREE_PAGE_IS_LEAF(pNews[0]);
    // copy content to the parent page
    tdbBtreeInitPage(pParent, &(SBtreeInitPageArg){.flags = flags, .pBt = pBt}, 0);
    tdbPageCopy(pNews[0], pParent);
  }

H
Hongze Cheng 已提交
828 829
  for (int i = 0; i < 3; i++) {
    if (pDivCell[i]) {
H
Hongze Cheng 已提交
830
      tdbOsFree(pDivCell[i]);
H
Hongze Cheng 已提交
831 832
    }
  }
H
Hongze Cheng 已提交
833

H
more  
Hongze Cheng 已提交
834 835 836
  // TODO: here is not corrent for drop case
  for (int i = 0; i < nNews; i++) {
    if (i < nOlds) {
H
Hongze Cheng 已提交
837
      tdbPagerReturnPage(pBt->pPager, pOlds[i], pTxn);
H
more  
Hongze Cheng 已提交
838
    } else {
H
Hongze Cheng 已提交
839
      tdbPagerReturnPage(pBt->pPager, pNews[i], pTxn);
H
more  
Hongze Cheng 已提交
840 841 842
    }
  }

H
Hongze Cheng 已提交
843
  return 0;
H
Hongze Cheng 已提交
844 845
}

H
Hongze Cheng 已提交
846
static int tdbBtreeBalance(SBTC *pBtc) {
H
Hongze Cheng 已提交
847
  int    iPage;
H
Hongze Cheng 已提交
848 849
  int    ret;
  int    nFree;
H
Hongze Cheng 已提交
850
  SPage *pParent;
H
Hongze Cheng 已提交
851
  SPage *pPage;
H
Hongze Cheng 已提交
852
  u8     flags;
H
Hongze Cheng 已提交
853 854
  u8     leaf;
  u8     root;
H
Hongze Cheng 已提交
855 856 857

  // Main loop to balance the BTree
  for (;;) {
H
Hongze Cheng 已提交
858 859
    iPage = pBtc->iPage;
    pPage = pBtc->pPage;
H
refact  
Hongze Cheng 已提交
860 861
    leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
    root = TDB_BTREE_PAGE_IS_ROOT(pPage);
H
Hongze Cheng 已提交
862
    nFree = TDB_PAGE_FREE_SIZE(pPage);
H
Hongze Cheng 已提交
863

H
Hongze Cheng 已提交
864 865
    // when the page is not overflow and not too empty, the balance work
    // is finished. Just break out the balance loop.
H
Hongze Cheng 已提交
866
    if (pPage->nOverflow == 0 && nFree < TDB_PAGE_USABLE_SIZE(pPage) * 2 / 3) {
H
Hongze Cheng 已提交
867 868 869 870
      break;
    }

    if (iPage == 0) {
H
Hongze Cheng 已提交
871 872 873
      // For the root page, only balance when the page is overfull,
      // ignore the case of empty
      if (pPage->nOverflow == 0) break;
H
Hongze Cheng 已提交
874

H
Hongze Cheng 已提交
875
      ret = tdbBtreeBalanceDeeper(pBtc->pBt, pPage, &(pBtc->pgStack[1]), pBtc->pTxn);
H
Hongze Cheng 已提交
876 877 878 879
      if (ret < 0) {
        return -1;
      }

H
Hongze Cheng 已提交
880 881 882 883 884
      pBtc->idx = 0;
      pBtc->idxStack[0] = 0;
      pBtc->pgStack[0] = pBtc->pPage;
      pBtc->iPage = 1;
      pBtc->pPage = pBtc->pgStack[1];
H
Hongze Cheng 已提交
885
    } else {
H
Hongze Cheng 已提交
886
      // Generalized balance step
H
Hongze Cheng 已提交
887
      pParent = pBtc->pgStack[iPage - 1];
H
Hongze Cheng 已提交
888

H
Hongze Cheng 已提交
889
      ret = tdbBtreeBalanceNonRoot(pBtc->pBt, pParent, pBtc->idxStack[pBtc->iPage - 1], pBtc->pTxn);
H
Hongze Cheng 已提交
890 891 892 893
      if (ret < 0) {
        return -1;
      }

H
Hongze Cheng 已提交
894
      tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
H
more  
Hongze Cheng 已提交
895

H
Hongze Cheng 已提交
896 897
      pBtc->iPage--;
      pBtc->pPage = pBtc->pgStack[pBtc->iPage];
H
Hongze Cheng 已提交
898 899 900 901 902
    }
  }

  return 0;
}
H
Hongze Cheng 已提交
903
// TDB_BTREE_BALANCE
H
Hongze Cheng 已提交
904

905
static int tdbFetchOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) {
906 907 908 909 910 911
  int ret = 0;

  *pPgno = 0;
  SBtreeInitPageArg iArg;
  iArg.pBt = pBt;
  iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
912
  ret = tdbPagerFetchPage(pBt->pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
913 914 915 916 917
  if (ret < 0) {
    return -1;
  }

  // mark dirty
918
  ret = tdbPagerWrite(pBt->pPager, *ppOfp);
919 920 921 922 923 924 925 926
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  return ret;
}

927
static int tdbLoadOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) {
928 929 930 931 932
  int ret = 0;

  SBtreeInitPageArg iArg;
  iArg.pBt = pBt;
  iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
933
  ret = tdbPagerFetchPage(pBt->pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
934 935 936 937 938 939 940
  if (ret < 0) {
    return -1;
  }

  return ret;
}

H
Hongze Cheng 已提交
941
// TDB_BTREE_CELL =====================
H
Hongze Cheng 已提交
942
static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal,
943 944 945 946
                                 int vLen, int *szPayload, TXN *pTxn, SBTree *pBt) {
  int ret = 0;
  int nPayload = kLen + vLen;
  int maxLocal = pPage->maxLocal;
H
Hongze Cheng 已提交
947

948
  if (nPayload + nHeader <= maxLocal) {
H
Hongze Cheng 已提交
949 950
    // no overflow page is needed
    memcpy(pCell + nHeader, pKey, kLen);
H
Hongze Cheng 已提交
951
    if (pVal) {
H
Hongze Cheng 已提交
952
      memcpy(pCell + nHeader + kLen, pVal, vLen);
H
Hongze Cheng 已提交
953 954 955 956
    }

    *szPayload = nPayload;
    return 0;
957 958 959 960 961 962
  } else {
    // handle overflow case
    // calc local storage size
    int minLocal = pPage->minLocal;
    int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
    int nLocal = surplus <= maxLocal ? surplus : minLocal;
H
Hongze Cheng 已提交
963

964
    // int ofpCap = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr));
965 966

    // fetch a new ofp and make it dirty
967
    SPgno  pgno = 0;
968 969
    SPage *ofp, *nextOfp;

970
    ret = tdbFetchOvflPage(&pgno, &ofp, pTxn, pBt);
971 972 973 974 975
    if (ret < 0) {
      return -1;
    }

    // local buffer for cell
M
Minglei Jin 已提交
976
    SCell *pBuf = tdbRealloc(NULL, pBt->pageSize);
977 978 979 980 981 982 983 984 985 986 987 988 989
    if (pBuf == NULL) {
      return -1;
    }

    int nLeft = nPayload;
    int bytes;
    int lastPage = 0;
    if (nLocal >= kLen + 4) {
      // pack key to local
      memcpy(pCell + nHeader, pKey, kLen);
      nLeft -= kLen;
      // pack partial val to local if any space left
      if (nLocal > kLen + 4) {
990 991
        memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno));
        nLeft -= nLocal - kLen - sizeof(SPgno);
992 993 994 995 996 997 998
      }

      // pack nextPgno
      memcpy(pCell + nHeader + nPayload - nLeft, &pgno, sizeof(pgno));

      // pack left val data to ovpages
      do {
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
        lastPage = 0;
        if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
          bytes = nLeft;
          lastPage = 1;
        } else {
          bytes = ofp->maxLocal - sizeof(SPgno);
        }

        // fetch next ofp if not last page
        if (!lastPage) {
          // fetch a new ofp and make it dirty
          ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
          if (ret < 0) {
            tdbFree(pBuf);
            return -1;
          }
        } else {
          pgno = 0;
        }

        memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
        memcpy(pBuf + bytes, &pgno, sizeof(pgno));

        ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
        if (ret < 0) {
          tdbFree(pBuf);
          return -1;
        }

        ofp = nextOfp;
        nLeft -= bytes;
1030 1031 1032 1033 1034 1035
      } while (nLeft > 0);
    } else {
      int nLeftKey = kLen;
      // pack partial key and nextPgno
      memcpy(pCell + nHeader, pKey, nLocal - 4);
      nLeft -= nLocal - 4;
1036
      nLeftKey -= nLocal - 4;
1037 1038 1039 1040 1041 1042

      memcpy(pCell + nHeader + nLocal - 4, &pgno, sizeof(pgno));

      int lastKeyPageSpace = 0;
      // pack left key & val to ovpages
      do {
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
        // cal key to cpy
        int lastKeyPage = 0;
        if (nLeftKey <= ofp->maxLocal - sizeof(SPgno)) {
          bytes = nLeftKey;
          lastKeyPage = 1;
          lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
        } else {
          bytes = ofp->maxLocal - sizeof(SPgno);
        }

        // cpy key
        memcpy(pBuf, ((SCell *)pKey) + kLen - nLeftKey, bytes);

        if (lastKeyPage) {
          if (lastKeyPageSpace >= vLen) {
            memcpy(pBuf + kLen - nLeftKey, pVal, vLen);

            nLeft -= vLen;
            pgno = 0;
          } else {
            memcpy(pBuf + kLen - nLeftKey, pVal, lastKeyPageSpace);
            nLeft -= lastKeyPageSpace;

            // fetch next ofp, a new ofp and make it dirty
            ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
            if (ret < 0) {
              return -1;
            }
          }
        } else {
          // fetch next ofp, a new ofp and make it dirty
          ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
          if (ret < 0) {
            return -1;
          }
        }

        memcpy(pBuf + kLen - nLeft, &pgno, sizeof(pgno));

        ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
        if (ret < 0) {
          return -1;
        }

        ofp = nextOfp;
        nLeftKey -= bytes;
        nLeft -= bytes;
1090 1091 1092
      } while (nLeftKey > 0);

      while (nLeft > 0) {
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124
        // pack left val data to ovpages
        lastPage = 0;
        if (nLeft <= maxLocal - sizeof(SPgno)) {
          bytes = nLeft;
          lastPage = 1;
        } else {
          bytes = maxLocal - sizeof(SPgno);
        }

        // fetch next ofp if not last page
        if (!lastPage) {
          // fetch a new ofp and make it dirty
          ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
          if (ret < 0) {
            tdbFree(pBuf);
            return -1;
          }
        } else {
          pgno = 0;
        }

        memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
        memcpy(pBuf + bytes, &pgno, sizeof(pgno));

        ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
        if (ret < 0) {
          tdbFree(pBuf);
          return -1;
        }

        ofp = nextOfp;
        nLeft -= bytes;
1125 1126 1127 1128 1129 1130 1131 1132 1133
      }
    }

    // free local buffer
    tdbFree(pBuf);

    *szPayload = nLocal;

    // ASSERT(0);
H
Hongze Cheng 已提交
1134 1135 1136 1137 1138
  }

  return 0;
}

H
Hongze Cheng 已提交
1139
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
1140
                              int *szCell, TXN *pTxn, SBTree *pBt) {
H
Hongze Cheng 已提交
1141 1142 1143 1144
  u8  leaf;
  int nHeader;
  int nPayload;
  int ret;
H
Hongze Cheng 已提交
1145 1146 1147

  ASSERT(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen);
  ASSERT(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen);
H
Hongze Cheng 已提交
1148
  ASSERT(pKey != NULL && kLen > 0);
H
Hongze Cheng 已提交
1149

H
Hongze Cheng 已提交
1150 1151
  nPayload = 0;
  nHeader = 0;
H
refact  
Hongze Cheng 已提交
1152
  leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
H
Hongze Cheng 已提交
1153

H
Hongze Cheng 已提交
1154
  // 1. Encode Header part
H
Hongze Cheng 已提交
1155 1156 1157 1158 1159 1160 1161 1162
  /* Encode SPgno if interior page */
  if (!leaf) {
    ASSERT(pPage->vLen == sizeof(SPgno));

    ((SPgno *)(pCell + nHeader))[0] = ((SPgno *)pVal)[0];
    nHeader = nHeader + sizeof(SPgno);
  }

H
Hongze Cheng 已提交
1163
  /* Encode kLen if need */
H
Hongze Cheng 已提交
1164
  if (pPage->kLen == TDB_VARIANT_LEN) {
H
Hongze Cheng 已提交
1165
    nHeader += tdbPutVarInt(pCell + nHeader, kLen);
H
Hongze Cheng 已提交
1166 1167
  }

H
Hongze Cheng 已提交
1168
  /* Encode vLen if need */
H
Hongze Cheng 已提交
1169
  if (pPage->vLen == TDB_VARIANT_LEN) {
H
Hongze Cheng 已提交
1170
    nHeader += tdbPutVarInt(pCell + nHeader, vLen);
H
Hongze Cheng 已提交
1171 1172
  }

H
Hongze Cheng 已提交
1173
  // 2. Encode payload part
H
Hongze Cheng 已提交
1174 1175 1176
  if ((!leaf) || pPage->vLen == 0) {
    pVal = NULL;
    vLen = 0;
H
Hongze Cheng 已提交
1177
  }
H
Hongze Cheng 已提交
1178

1179
  ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload, pTxn, pBt);
H
Hongze Cheng 已提交
1180
  if (ret < 0) {
H
Hongze Cheng 已提交
1181 1182 1183
    // TODO
    ASSERT(0);
    return 0;
H
Hongze Cheng 已提交
1184 1185
  }

H
Hongze Cheng 已提交
1186
  *szCell = nHeader + nPayload;
H
Hongze Cheng 已提交
1187 1188 1189
  return 0;
}

1190 1191
static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder, TXN *pTxn,
                                 SBTree *pBt) {
1192
  int ret = 0;
H
Hongze Cheng 已提交
1193
  int nPayload;
1194 1195 1196 1197
  int maxLocal = pPage->maxLocal;

  int kLen = pDecoder->kLen;
  int vLen = pDecoder->vLen;
H
Hongze Cheng 已提交
1198 1199

  if (pDecoder->pVal) {
H
Hongze Cheng 已提交
1200
    ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage));
H
Hongze Cheng 已提交
1201
    nPayload = pDecoder->kLen;
H
Hongze Cheng 已提交
1202 1203
  } else {
    nPayload = pDecoder->kLen + pDecoder->vLen;
H
Hongze Cheng 已提交
1204 1205
  }

1206
  if (nHeader + nPayload <= maxLocal) {
H
Hongze Cheng 已提交
1207
    // no over flow case
1208
    pDecoder->pKey = (SCell *)pCell + nHeader;
H
Hongze Cheng 已提交
1209
    if (pDecoder->pVal == NULL && pDecoder->vLen > 0) {
1210
      pDecoder->pVal = (SCell *)pCell + nHeader + pDecoder->kLen;
H
Hongze Cheng 已提交
1211
    }
H
Hongze Cheng 已提交
1212
    return 0;
1213 1214 1215 1216 1217 1218 1219
  } else {
    // handle overflow case
    // calc local storage size
    int minLocal = pPage->minLocal;
    int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
    int nLocal = surplus <= maxLocal ? surplus : minLocal;

1220 1221
    int    nLeft = nPayload;
    SPgno  pgno = 0;
1222 1223
    SPage *ofp;
    SCell *ofpCell;
1224 1225
    int    bytes;
    int    lastPage = 0;
1226 1227 1228 1229 1230

    if (nLocal >= pDecoder->kLen + 4) {
      pDecoder->pKey = (SCell *)pCell + nHeader;
      nLeft -= kLen;
      if (nLocal > kLen + 4) {
1231 1232 1233 1234 1235 1236
        // read partial val to local
        pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
        if (pDecoder->pVal == NULL) {
          return -1;
        }
        TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
1237

1238
        memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno));
1239

1240
        nLeft -= nLocal - kLen - sizeof(SPgno);
1241
      }
H
Hongze Cheng 已提交
1242

1243 1244 1245
      memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno));

      // unpack left val data from ovpages
1246 1247 1248 1249 1250
      while (pgno != 0) {
        ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
        if (ret < 0) {
          return -1;
        }
1251

1252
        ofpCell = tdbPageGetCell(ofp, 0);
1253

1254 1255 1256 1257 1258 1259
        if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
          bytes = nLeft;
          lastPage = 1;
        } else {
          bytes = ofp->maxLocal - sizeof(SPgno);
        }
1260

1261 1262
        memcpy(pDecoder->pVal + vLen - nLeft, ofpCell, bytes);
        nLeft -= bytes;
1263

1264
        memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
1265 1266 1267 1268 1269 1270
      }
    } else {
      int nLeftKey = kLen;
      // load partial key and nextPgno
      pDecoder->pKey = tdbRealloc(pDecoder->pKey, kLen);
      if (pDecoder->pKey == NULL) {
1271
        return -1;
1272 1273 1274 1275 1276
      }
      TDB_CELLDECODER_SET_FREE_KEY(pDecoder);

      memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4);
      nLeft -= nLocal - 4;
1277
      nLeftKey -= nLocal - 4;
1278 1279 1280 1281 1282 1283

      memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno));

      int lastKeyPageSpace = 0;
      // load left key & val to ovpages
      while (pgno != 0) {
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
        ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
        if (ret < 0) {
          return -1;
        }

        ofpCell = tdbPageGetCell(ofp, 0);

        int lastKeyPage = 0;
        if (nLeftKey <= maxLocal - sizeof(SPgno)) {
          bytes = nLeftKey;
          lastKeyPage = 1;
          lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
        } else {
          bytes = ofp->maxLocal - sizeof(SPgno);
        }

        // cpy key
        memcpy(pDecoder->pKey + kLen - nLeftKey, ofpCell, bytes);

        if (lastKeyPage) {
          if (lastKeyPageSpace >= vLen) {
            pDecoder->pVal = ofpCell + kLen - nLeftKey;

            nLeft -= vLen;
            pgno = 0;
          } else {
            // read partial val to local
            pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
            if (pDecoder->pVal == NULL) {
              return -1;
            }
            TDB_CELLDECODER_SET_FREE_VAL(pDecoder);

            memcpy(pDecoder->pVal, ofpCell + kLen - nLeftKey, lastKeyPageSpace);
            nLeft -= lastKeyPageSpace;
          }
        }

        memcpy(&pgno, ofpCell + bytes, sizeof(pgno));

        nLeftKey -= bytes;
        nLeft -= bytes;
1326 1327 1328
      }

      while (nLeft > 0) {
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
        ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
        if (ret < 0) {
          return -1;
        }

        ofpCell = tdbPageGetCell(ofp, 0);

        // load left val data to ovpages
        lastPage = 0;
        if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
          bytes = nLeft;
          lastPage = 1;
        } else {
          bytes = ofp->maxLocal - sizeof(SPgno);
        }

        if (lastPage) {
          pgno = 0;
        }

        if (!pDecoder->pVal) {
          pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
          if (pDecoder->pVal == NULL) {
            return -1;
          }
          TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
        }

        memcpy(pDecoder->pVal, ofpCell + vLen - nLeft, bytes);
        nLeft -= bytes;

        memcpy(&pgno, ofpCell + vLen - nLeft + bytes, sizeof(pgno));

        nLeft -= bytes;
1363 1364
      }
    }
H
Hongze Cheng 已提交
1365 1366
  }

H
Hongze Cheng 已提交
1367 1368 1369
  return 0;
}

1370
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt) {
H
Hongze Cheng 已提交
1371 1372 1373 1374 1375
  u8  leaf;
  int nHeader;
  int ret;

  nHeader = 0;
H
refact  
Hongze Cheng 已提交
1376
  leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
H
Hongze Cheng 已提交
1377

H
Hongze Cheng 已提交
1378 1379 1380 1381 1382 1383
  // Clear the state of decoder
  pDecoder->kLen = -1;
  pDecoder->pKey = NULL;
  pDecoder->vLen = -1;
  pDecoder->pVal = NULL;
  pDecoder->pgno = 0;
1384
  TDB_CELLDECODER_SET_FREE_NIL(pDecoder);
H
Hongze Cheng 已提交
1385

H
Hongze Cheng 已提交
1386
  // 1. Decode header part
H
Hongze Cheng 已提交
1387 1388 1389 1390 1391 1392 1393 1394
  if (!leaf) {
    ASSERT(pPage->vLen == sizeof(SPgno));

    pDecoder->pgno = ((SPgno *)(pCell + nHeader))[0];
    pDecoder->pVal = (u8 *)(&(pDecoder->pgno));
    nHeader = nHeader + sizeof(SPgno);
  }

H
Hongze Cheng 已提交
1395 1396 1397 1398 1399 1400 1401
  if (pPage->kLen == TDB_VARIANT_LEN) {
    nHeader += tdbGetVarInt(pCell + nHeader, &(pDecoder->kLen));
  } else {
    pDecoder->kLen = pPage->kLen;
  }

  if (pPage->vLen == TDB_VARIANT_LEN) {
H
Hongze Cheng 已提交
1402
    ASSERT(leaf);
H
Hongze Cheng 已提交
1403
    nHeader += tdbGetVarInt(pCell + nHeader, &(pDecoder->vLen));
H
Hongze Cheng 已提交
1404
  } else {
H
Hongze Cheng 已提交
1405 1406
    pDecoder->vLen = pPage->vLen;
  }
H
Hongze Cheng 已提交
1407

H
Hongze Cheng 已提交
1408
  // 2. Decode payload part
1409
  ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder, pTxn, pBt);
H
Hongze Cheng 已提交
1410 1411
  if (ret < 0) {
    return -1;
H
Hongze Cheng 已提交
1412 1413 1414 1415 1416
  }

  return 0;
}

1417
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *pTxn, SBTree *pBt) {
H
refact  
Hongze Cheng 已提交
1418
  u8  leaf;
1419
  int kLen = 0, vLen = 0, nHeader = 0;
H
Hongze Cheng 已提交
1420

H
refact  
Hongze Cheng 已提交
1421
  leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
H
Hongze Cheng 已提交
1422

H
refact  
Hongze Cheng 已提交
1423
  if (!leaf) {
1424
    nHeader += sizeof(SPgno);
H
Hongze Cheng 已提交
1425 1426 1427
  }

  if (pPage->kLen == TDB_VARIANT_LEN) {
1428
    nHeader += tdbGetVarInt(pCell + nHeader, &kLen);
H
Hongze Cheng 已提交
1429 1430 1431 1432
  } else {
    kLen = pPage->kLen;
  }

H
refact  
Hongze Cheng 已提交
1433 1434
  if (pPage->vLen == TDB_VARIANT_LEN) {
    ASSERT(leaf);
1435
    nHeader += tdbGetVarInt(pCell + nHeader, &vLen);
H
refact  
Hongze Cheng 已提交
1436 1437
  } else if (leaf) {
    vLen = pPage->vLen;
H
Hongze Cheng 已提交
1438 1439
  }

1440 1441 1442 1443 1444
  int nPayload = kLen + vLen;
  if (nHeader + nPayload <= pPage->maxLocal) {
    return nHeader + kLen + vLen;
  } else {
    int maxLocal = pPage->maxLocal;
H
Hongze Cheng 已提交
1445

1446 1447 1448 1449
    // calc local storage size
    int minLocal = pPage->minLocal;
    int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
    int nLocal = surplus <= maxLocal ? surplus : minLocal;
H
refact  
Hongze Cheng 已提交
1450

1451 1452
    // free ofp pages' cells
    if (dropOfp) {
1453 1454 1455
      int    ret = 0;
      SPgno  pgno = *(SPgno *)(pCell + nHeader + nLocal - sizeof(SPgno));
      int    nLeft = nPayload - nLocal + sizeof(SPgno);
1456
      SPage *ofp;
1457
      int    bytes;
1458 1459

      while (pgno != 0) {
1460 1461 1462 1463
        ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
        if (ret < 0) {
          return -1;
        }
1464

1465
        SCell *ofpCell = tdbPageGetCell(ofp, 0);
1466

1467 1468 1469 1470 1471
        if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
          bytes = nLeft;
        } else {
          bytes = ofp->maxLocal - sizeof(SPgno);
        }
1472

1473
        memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
1474

1475
        tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
1476

1477
        nLeft -= bytes;
1478 1479 1480 1481
      }
    }

    return nHeader + nLocal;
H
refact  
Hongze Cheng 已提交
1482
  }
H
Hongze Cheng 已提交
1483
}
H
Hongze Cheng 已提交
1484
// TDB_BTREE_CELL
H
Hongze Cheng 已提交
1485

H
refact  
Hongze Cheng 已提交
1486
// TDB_BTREE_CURSOR =====================
H
Hongze Cheng 已提交
1487
int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) {
H
Hongze Cheng 已提交
1488 1489 1490 1491
  pBtc->pBt = pBt;
  pBtc->iPage = -1;
  pBtc->pPage = NULL;
  pBtc->idx = -1;
H
Hongze Cheng 已提交
1492
  memset(&pBtc->coder, 0, sizeof(SCellDecoder));
H
Hongze Cheng 已提交
1493 1494 1495 1496 1497 1498 1499

  if (pTxn == NULL) {
    pBtc->pTxn = &pBtc->txn;
    tdbTxnOpen(pBtc->pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
  } else {
    pBtc->pTxn = pTxn;
  }
H
Hongze Cheng 已提交
1500 1501 1502 1503

  return 0;
}

H
Hongze Cheng 已提交
1504
int tdbBtcMoveToFirst(SBTC *pBtc) {
H
refact  
Hongze Cheng 已提交
1505 1506 1507
  int     ret;
  SBTree *pBt;
  SPager *pPager;
H
Hongze Cheng 已提交
1508 1509
  SCell  *pCell;
  SPgno   pgno;
H
refact  
Hongze Cheng 已提交
1510 1511 1512 1513 1514

  pBt = pBtc->pBt;
  pPager = pBt->pPager;

  if (pBtc->iPage < 0) {
H
Hongze Cheng 已提交
1515
    // move a clean cursor
H
Hongze Cheng 已提交
1516
    ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
H
Hongze Cheng 已提交
1517
                            &((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
H
refact  
Hongze Cheng 已提交
1518 1519 1520 1521 1522
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }

H
Hongze Cheng 已提交
1523
    ASSERT(TDB_BTREE_PAGE_IS_ROOT(pBtc->pPage));
H
refact  
Hongze Cheng 已提交
1524

H
Hongze Cheng 已提交
1525 1526 1527 1528
    pBtc->iPage = 0;
    if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0) {
      pBtc->idx = 0;
    } else {
H
more  
Hongze Cheng 已提交
1529
      // no any data, point to an invalid position
H
Hongze Cheng 已提交
1530 1531
      ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
      pBtc->idx = -1;
H
refact  
Hongze Cheng 已提交
1532 1533
      return 0;
    }
H
Hongze Cheng 已提交
1534
  } else {
H
Hongze Cheng 已提交
1535 1536
    ASSERT(0);
#if 0
H
Hongze Cheng 已提交
1537 1538
    // move from a position
    int iPage = 0;
H
refact  
Hongze Cheng 已提交
1539

H
Hongze Cheng 已提交
1540 1541 1542 1543
    for (; iPage < pBtc->iPage; iPage++) {
      ASSERT(pBtc->idxStack[iPage] >= 0);
      if (pBtc->idxStack[iPage]) break;
    }
H
refact  
Hongze Cheng 已提交
1544

H
Hongze Cheng 已提交
1545 1546
    // move upward
    for (;;) {
H
Hongze Cheng 已提交
1547
      if (pBtc->iPage == iPage) {
H
Hongze Cheng 已提交
1548
        pBtc->idx = 0;
H
refact  
Hongze Cheng 已提交
1549 1550
        break;
      }
H
Hongze Cheng 已提交
1551

H
Hongze Cheng 已提交
1552
      tdbBtcMoveUpward(pBtc);
H
Hongze Cheng 已提交
1553
    }
H
Hongze Cheng 已提交
1554
#endif
H
Hongze Cheng 已提交
1555 1556 1557 1558
  }

  // move downward
  for (;;) {
H
refact  
Hongze Cheng 已提交
1559
    if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
H
Hongze Cheng 已提交
1560

H
Hongze Cheng 已提交
1561
    ret = tdbBtcMoveDownward(pBtc);
H
Hongze Cheng 已提交
1562 1563 1564 1565 1566 1567 1568 1569
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }

    pBtc->idx = 0;
  }

H
Hongze Cheng 已提交
1570 1571 1572 1573
  return 0;
}

int tdbBtcMoveToLast(SBTC *pBtc) {
H
Hongze Cheng 已提交
1574
  int     ret;
H
Hongze Cheng 已提交
1575
  int     nCells;
H
Hongze Cheng 已提交
1576 1577 1578 1579 1580 1581 1582 1583 1584
  SBTree *pBt;
  SPager *pPager;
  SPgno   pgno;

  pBt = pBtc->pBt;
  pPager = pBt->pPager;

  if (pBtc->iPage < 0) {
    // move a clean cursor
H
Hongze Cheng 已提交
1585
    ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
H
Hongze Cheng 已提交
1586
                            &((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
H
Hongze Cheng 已提交
1587 1588 1589 1590 1591
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }

H
Hongze Cheng 已提交
1592
    nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
H
Hongze Cheng 已提交
1593
    pBtc->iPage = 0;
H
Hongze Cheng 已提交
1594 1595 1596 1597 1598 1599 1600 1601
    if (nCells > 0) {
      pBtc->idx = TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage) ? nCells - 1 : nCells;
    } else {
      // no data at all, point to an invalid position
      ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
      pBtc->idx = -1;
      return 0;
    }
H
Hongze Cheng 已提交
1602
  } else {
H
Hongze Cheng 已提交
1603 1604
    ASSERT(0);
#if 0
H
Hongze Cheng 已提交
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615
    int iPage = 0;

    // downward search
    for (; iPage < pBtc->iPage; iPage++) {
      ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pgStack[iPage]));
      nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pgStack[iPage]);
      if (pBtc->idxStack[iPage] != nCells) break;
    }

    // move upward
    for (;;) {
H
Hongze Cheng 已提交
1616
      if (pBtc->iPage == iPage) {
H
Hongze Cheng 已提交
1617 1618 1619 1620 1621
        if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
          pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
        } else {
          pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
        }
H
Hongze Cheng 已提交
1622
        break;
H
Hongze Cheng 已提交
1623 1624 1625 1626
      }

      tdbBtcMoveUpward(pBtc);
    }
H
Hongze Cheng 已提交
1627
#endif
H
Hongze Cheng 已提交
1628 1629 1630 1631
  }

  // move downward
  for (;;) {
H
Hongze Cheng 已提交
1632 1633 1634 1635 1636 1637 1638 1639 1640
    if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;

    ret = tdbBtcMoveDownward(pBtc);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }

    nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
H
refact  
Hongze Cheng 已提交
1641
    if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
H
Hongze Cheng 已提交
1642
      pBtc->idx = nCells - 1;
H
Hongze Cheng 已提交
1643
    } else {
H
Hongze Cheng 已提交
1644
      pBtc->idx = nCells;
H
Hongze Cheng 已提交
1645 1646 1647
    }
  }

H
Hongze Cheng 已提交
1648 1649 1650 1651
  return 0;
}

int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
H
Hongze Cheng 已提交
1652 1653 1654 1655 1656
  SCell       *pCell;
  SCellDecoder cd;
  void        *pKey, *pVal;
  int          ret;

H
Hongze Cheng 已提交
1657
  // current cursor points to an invalid position
H
Hongze Cheng 已提交
1658
  if (pBtc->idx < 0) {
H
Hongze Cheng 已提交
1659 1660 1661 1662 1663
    return -1;
  }

  pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);

1664
  tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
1665

H
Hongze Cheng 已提交
1666
  pKey = tdbRealloc(*ppKey, cd.kLen);
H
Hongze Cheng 已提交
1667 1668 1669 1670 1671 1672 1673
  if (pKey == NULL) {
    return -1;
  }

  *ppKey = pKey;
  *kLen = cd.kLen;
  memcpy(pKey, cd.pKey, cd.kLen);
C
Cary Xu 已提交
1674 1675 1676

  if (ppVal) {
    // TODO: vLen may be zero
H
Hongze Cheng 已提交
1677
    pVal = tdbRealloc(*ppVal, cd.vLen);
C
Cary Xu 已提交
1678
    if (pVal == NULL) {
H
Hongze Cheng 已提交
1679
      tdbFree(pKey);
C
Cary Xu 已提交
1680 1681 1682 1683 1684 1685 1686
      return -1;
    }

    *ppVal = pVal;
    *vLen = cd.vLen;
    memcpy(pVal, cd.pVal, cd.vLen);
  }
H
Hongze Cheng 已提交
1687 1688

  ret = tdbBtcMoveToNext(pBtc);
H
Hongze Cheng 已提交
1689 1690 1691 1692
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }
H
Hongze Cheng 已提交
1693 1694 1695 1696

  return 0;
}

1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709
int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
  SCell       *pCell;
  SCellDecoder cd;
  void        *pKey, *pVal;
  int          ret;

  // current cursor points to an invalid position
  if (pBtc->idx < 0) {
    return -1;
  }

  pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);

1710
  tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742

  pKey = tdbRealloc(*ppKey, cd.kLen);
  if (pKey == NULL) {
    return -1;
  }

  *ppKey = pKey;
  *kLen = cd.kLen;
  memcpy(pKey, cd.pKey, cd.kLen);

  if (ppVal) {
    // TODO: vLen may be zero
    pVal = tdbRealloc(*ppVal, cd.vLen);
    if (pVal == NULL) {
      tdbFree(pKey);
      return -1;
    }

    *ppVal = pVal;
    *vLen = cd.vLen;
    memcpy(pVal, cd.pVal, cd.vLen);
  }

  ret = tdbBtcMoveToPrev(pBtc);
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  return 0;
}

H
Hongze Cheng 已提交
1743
int tdbBtcMoveToNext(SBTC *pBtc) {
H
Hongze Cheng 已提交
1744
  int    nCells;
H
Hongze Cheng 已提交
1745
  int    ret;
H
Hongze Cheng 已提交
1746 1747
  SCell *pCell;

H
refact  
Hongze Cheng 已提交
1748
  ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
H
Hongze Cheng 已提交
1749 1750 1751

  if (pBtc->idx < 0) return -1;

H
Hongze Cheng 已提交
1752
  pBtc->idx++;
H
Hongze Cheng 已提交
1753 1754 1755 1756
  if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
    return 0;
  }

H
Hongze Cheng 已提交
1757
  // move upward
H
Hongze Cheng 已提交
1758
  for (;;) {
H
Hongze Cheng 已提交
1759 1760 1761 1762 1763
    if (pBtc->iPage == 0) {
      pBtc->idx = -1;
      return 0;
    }

H
Hongze Cheng 已提交
1764 1765 1766
    tdbBtcMoveUpward(pBtc);
    pBtc->idx++;

H
Hongze Cheng 已提交
1767 1768
    ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
    if (pBtc->idx <= TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
H
Hongze Cheng 已提交
1769 1770
      break;
    }
H
Hongze Cheng 已提交
1771 1772
  }

H
Hongze Cheng 已提交
1773
  // move downward
H
Hongze Cheng 已提交
1774
  for (;;) {
H
Hongze Cheng 已提交
1775
    if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
H
Hongze Cheng 已提交
1776

H
Hongze Cheng 已提交
1777 1778 1779 1780
    ret = tdbBtcMoveDownward(pBtc);
    if (ret < 0) {
      ASSERT(0);
      return -1;
H
Hongze Cheng 已提交
1781
    }
H
Hongze Cheng 已提交
1782 1783

    pBtc->idx = 0;
H
Hongze Cheng 已提交
1784 1785
  }

H
Hongze Cheng 已提交
1786 1787 1788
  return 0;
}

H
Hongze Cheng 已提交
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825
int tdbBtcMoveToPrev(SBTC *pBtc) {
  if (pBtc->idx < 0) return -1;

  pBtc->idx--;
  if (pBtc->idx >= 0) {
    return 0;
  }

  // move upward
  for (;;) {
    if (pBtc->iPage == 0) {
      pBtc->idx = -1;
      return 0;
    }

    tdbBtcMoveUpward(pBtc);
    pBtc->idx--;
    if (pBtc->idx >= 0) {
      break;
    }
  }

  // move downward
  for (;;) {
    if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;

    tdbBtcMoveDownward(pBtc);
    if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
      pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
    } else {
      pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
    }
  }

  return 0;
}

H
Hongze Cheng 已提交
1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
static int tdbBtcMoveDownward(SBTC *pBtc) {
  int    ret;
  SPgno  pgno;
  SCell *pCell;

  ASSERT(pBtc->idx >= 0);
  ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));

  if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
    pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
    pgno = ((SPgno *)pCell)[0];
  } else {
    pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno;
  }
H
Hongze Cheng 已提交
1840

H
Hongze Cheng 已提交
1841 1842
  ASSERT(pgno);

H
Hongze Cheng 已提交
1843 1844 1845 1846 1847
  pBtc->pgStack[pBtc->iPage] = pBtc->pPage;
  pBtc->idxStack[pBtc->iPage] = pBtc->idx;
  pBtc->iPage++;
  pBtc->pPage = NULL;
  pBtc->idx = -1;
H
Hongze Cheng 已提交
1848

H
Hongze Cheng 已提交
1849 1850
  ret = tdbPagerFetchPage(pBtc->pBt->pPager, &pgno, &pBtc->pPage, tdbBtreeInitPage,
                          &((SBtreeInitPageArg){.pBt = pBtc->pBt, .flags = 0}), pBtc->pTxn);
H
Hongze Cheng 已提交
1851 1852
  if (ret < 0) {
    ASSERT(0);
H
Hongze Cheng 已提交
1853
    return -1;
H
Hongze Cheng 已提交
1854 1855 1856 1857 1858 1859
  }

  return 0;
}

static int tdbBtcMoveUpward(SBTC *pBtc) {
H
Hongze Cheng 已提交
1860 1861
  if (pBtc->iPage == 0) return -1;

H
Hongze Cheng 已提交
1862
  tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
H
Hongze Cheng 已提交
1863 1864 1865 1866 1867

  pBtc->iPage--;
  pBtc->pPage = pBtc->pgStack[pBtc->iPage];
  pBtc->idx = pBtc->idxStack[pBtc->iPage];

H
Hongze Cheng 已提交
1868
  return 0;
H
Hongze Cheng 已提交
1869
}
H
refact  
Hongze Cheng 已提交
1870

H
Hongze Cheng 已提交
1871 1872 1873 1874 1875 1876 1877 1878
int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen) {
  SCell *pCell;

  if (pBtc->idx < 0 || pBtc->idx >= TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
    return -1;
  }

  pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
1879
  tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
1880 1881 1882 1883 1884 1885 1886 1887

  if (ppKey) {
    *ppKey = (void *)pBtc->coder.pKey;
    *kLen = pBtc->coder.kLen;
  }

  if (ppVal) {
    *ppVal = (void *)pBtc->coder.pVal;
H
Hongze Cheng 已提交
1888
    *vLen = pBtc->coder.vLen;
H
Hongze Cheng 已提交
1889 1890 1891 1892 1893
  }

  return 0;
}

H
Hongze Cheng 已提交
1894
int tdbBtcDelete(SBTC *pBtc) {
H
Hongze Cheng 已提交
1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912
  int         idx = pBtc->idx;
  int         nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
  SPager     *pPager = pBtc->pBt->pPager;
  const void *pKey;
  i8          iPage;
  SPage      *pPage;
  SPgno       pgno;
  SCell      *pCell;
  int         szCell;
  int         nKey;
  int         ret;

  ASSERT(idx >= 0 && idx < nCells);

  // drop the cell on the leaf
  ret = tdbPagerWrite(pPager, pBtc->pPage);
  if (ret < 0) {
    ASSERT(0);
H
Hongze Cheng 已提交
1913 1914 1915
    return -1;
  }

1916
  tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
1917

H
Hongze Cheng 已提交
1918
  // update interior page or do balance
H
Hongze Cheng 已提交
1919
  if (idx == nCells - 1) {
H
Hongze Cheng 已提交
1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939
    if (idx) {
      pBtc->idx--;
      tdbBtcGet(pBtc, &pKey, &nKey, NULL, NULL);

      // loop to update the interial page
      pgno = TDB_PAGE_PGNO(pBtc->pPage);
      for (iPage = pBtc->iPage - 1; iPage >= 0; iPage--) {
        pPage = pBtc->pgStack[iPage];
        idx = pBtc->idxStack[iPage];
        nCells = TDB_PAGE_TOTAL_CELLS(pPage);

        if (idx < nCells) {
          ret = tdbPagerWrite(pPager, pPage);
          if (ret < 0) {
            ASSERT(0);
            return -1;
          }

          // update the cell with new key
          pCell = tdbOsMalloc(nKey + 9);
1940
          tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
1941

1942
          ret = tdbPageUpdateCell(pPage, idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954
          if (ret < 0) {
            tdbOsFree(pCell);
            ASSERT(0);
            return -1;
          }
          tdbOsFree(pCell);
          break;
        } else {
          pgno = TDB_PAGE_PGNO(pPage);
        }
      }
    } else {
H
Hongze Cheng 已提交
1955
      // delete the leaf page and do balance
H
Hongze Cheng 已提交
1956
      ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0);
H
Hongze Cheng 已提交
1957 1958 1959 1960 1961 1962

      ret = tdbBtreeBalance(pBtc);
      if (ret < 0) {
        ASSERT(0);
        return -1;
      }
H
Hongze Cheng 已提交
1963
    }
H
Hongze Cheng 已提交
1964 1965 1966 1967 1968
  }

  return 0;
}

H
Hongze Cheng 已提交
1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980
int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int nData, int insert) {
  SCell *pCell;
  int    szCell;
  int    nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
  int    szBuf;
  void  *pBuf;
  int    ret;

  ASSERT(pBtc->idx >= 0);

  // alloc space
  szBuf = kLen + nData + 14;
1981
  pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
H
Hongze Cheng 已提交
1982 1983 1984 1985 1986 1987 1988 1989
  if (pBuf == NULL) {
    ASSERT(0);
    return -1;
  }
  pBtc->pBt->pBuf = pBuf;
  pCell = (SCell *)pBtc->pBt->pBuf;

  // encode cell
1990
  ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  // mark dirty
  ret = tdbPagerWrite(pBtc->pBt->pPager, pBtc->pPage);
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  // insert or update
  if (insert) {
    ASSERT(pBtc->idx <= nCells);

    ret = tdbPageInsertCell(pBtc->pPage, pBtc->idx, pCell, szCell, 0);
  } else {
    ASSERT(pBtc->idx < nCells);

2011
    ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026
  }
  if (ret < 0) {
    ASSERT(0);
    return -1;
  }

  // check balance
  if (pBtc->pPage->nOverflow > 0) {
    ret = tdbBtreeBalance(pBtc);
    if (ret < 0) {
      ASSERT(0);
      return -1;
    }
  }

H
Hongze Cheng 已提交
2027 2028 2029
  return 0;
}

H
Hongze Cheng 已提交
2030
int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
H
Hongze Cheng 已提交
2031 2032 2033 2034 2035 2036 2037 2038
  int         ret;
  int         nCells;
  int         c;
  SCell      *pCell;
  SBTree     *pBt = pBtc->pBt;
  SPager     *pPager = pBt->pPager;
  const void *pTKey;
  int         tkLen;
H
Hongze Cheng 已提交
2039

wmmhello's avatar
wmmhello 已提交
2040
  tdbTrace("ttl moveto, pager:%p, ipage:%d", pPager, pBtc->iPage);
H
Hongze Cheng 已提交
2041
  if (pBtc->iPage < 0) {
H
Hongze Cheng 已提交
2042
    // move from a clear cursor
H
Hongze Cheng 已提交
2043 2044
    ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
                            &((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
H
Hongze Cheng 已提交
2045
    if (ret < 0) {
H
Hongze Cheng 已提交
2046
      // TODO
H
Hongze Cheng 已提交
2047
      ASSERT(0);
H
Hongze Cheng 已提交
2048
      return 0;
H
Hongze Cheng 已提交
2049 2050 2051
    }

    pBtc->iPage = 0;
H
Hongze Cheng 已提交
2052 2053 2054 2055
    pBtc->idx = -1;
    // for empty tree, just return with an invalid position
    if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) return 0;
  } else {
H
Hongze Cheng 已提交
2056 2057
    ASSERT(0);
#if 0
H
Hongze Cheng 已提交
2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072
    SPage *pPage;
    int    idx;
    int    iPage = 0;

    // downward search
    for (; iPage < pBtc->iPage; iPage++) {
      pPage = pBtc->pgStack[iPage];
      idx = pBtc->idxStack[iPage];
      nCells = TDB_PAGE_TOTAL_CELLS(pPage);

      ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage));

      // check if key <= current position
      if (idx < nCells) {
        pCell = tdbPageGetCell(pPage, idx);
2073
        tdbBtreeDecodeCell(pPage, pCell, &cd);
H
Hongze Cheng 已提交
2074 2075 2076 2077 2078 2079 2080 2081
        c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
        if (c > 0) break;
      }

      // check if key > current - 1 position
      if (idx > 0) {
        pCell = tdbPageGetCell(pPage, idx - 1);
        tdbBtreeDecodeCell(pPage, pCell, &cd);
2082
        c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen, pBtc->pTxn, pBtc->pBt);
H
Hongze Cheng 已提交
2083 2084 2085 2086 2087 2088 2089 2090 2091
        if (c <= 0) break;
      }
    }

    // move upward
    for (;;) {
      if (pBtc->iPage == iPage) break;
      tdbBtcMoveUpward(pBtc);
    }
H
Hongze Cheng 已提交
2092
#endif
H
Hongze Cheng 已提交
2093
  }
H
Hongze Cheng 已提交
2094

H
Hongze Cheng 已提交
2095 2096
  // search downward to the leaf
  for (;;) {
H
Hongze Cheng 已提交
2097
    int    lidx, ridx;
H
Hongze Cheng 已提交
2098
    SPage *pPage;
H
Hongze Cheng 已提交
2099

H
Hongze Cheng 已提交
2100 2101 2102 2103
    pPage = pBtc->pPage;
    nCells = TDB_PAGE_TOTAL_CELLS(pPage);
    lidx = 0;
    ridx = nCells - 1;
H
Hongze Cheng 已提交
2104

H
Hongze Cheng 已提交
2105
    ASSERT(nCells > 0);
H
Hongze Cheng 已提交
2106

H
Hongze Cheng 已提交
2107
    // compare first cell
H
Hongze Cheng 已提交
2108 2109 2110
    pBtc->idx = lidx;
    tdbBtcGet(pBtc, &pTKey, &tkLen, NULL, NULL);
    c = pBt->kcmpr(pKey, kLen, pTKey, tkLen);
H
Hongze Cheng 已提交
2111 2112 2113 2114 2115 2116 2117 2118
    if (c <= 0) {
      ridx = lidx - 1;
    } else {
      lidx = lidx + 1;
    }

    // compare last cell
    if (lidx <= ridx) {
H
Hongze Cheng 已提交
2119 2120 2121
      pBtc->idx = ridx;
      tdbBtcGet(pBtc, &pTKey, &tkLen, NULL, NULL);
      c = pBt->kcmpr(pKey, kLen, pTKey, tkLen);
H
Hongze Cheng 已提交
2122 2123 2124 2125 2126 2127 2128
      if (c >= 0) {
        lidx = ridx + 1;
      } else {
        ridx = ridx - 1;
      }
    }

H
Hongze Cheng 已提交
2129 2130 2131
    // binary search
    for (;;) {
      if (lidx > ridx) break;
H
Hongze Cheng 已提交
2132

H
Hongze Cheng 已提交
2133 2134 2135
      pBtc->idx = (lidx + ridx) >> 1;
      tdbBtcGet(pBtc, &pTKey, &tkLen, NULL, NULL);
      c = pBt->kcmpr(pKey, kLen, pTKey, tkLen);
H
Hongze Cheng 已提交
2136 2137
      if (c < 0) {
        // pKey < cd.pKey
H
Hongze Cheng 已提交
2138
        ridx = pBtc->idx - 1;
H
Hongze Cheng 已提交
2139 2140
      } else if (c > 0) {
        // pKey > cd.pKey
H
Hongze Cheng 已提交
2141
        lidx = pBtc->idx + 1;
H
Hongze Cheng 已提交
2142 2143 2144
      } else {
        // pKey == cd.pKey
        break;
H
Hongze Cheng 已提交
2145
      }
H
Hongze Cheng 已提交
2146
    }
H
Hongze Cheng 已提交
2147

H
Hongze Cheng 已提交
2148 2149 2150 2151 2152
    // keep search downward or break
    if (TDB_BTREE_PAGE_IS_LEAF(pPage)) {
      *pCRst = c;
      break;
    } else {
H
Hongze Cheng 已提交
2153 2154
      if (c > 0) {
        pBtc->idx += 1;
H
Hongze Cheng 已提交
2155
      }
H
Hongze Cheng 已提交
2156
      tdbBtcMoveDownward(pBtc);
H
Hongze Cheng 已提交
2157 2158 2159 2160 2161 2162
    }
  }

  return 0;
}

H
refact  
Hongze Cheng 已提交
2163 2164 2165 2166 2167 2168
int tdbBtcClose(SBTC *pBtc) {
  if (pBtc->iPage < 0) return 0;

  for (;;) {
    ASSERT(pBtc->pPage);

H
Hongze Cheng 已提交
2169
    tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
H
refact  
Hongze Cheng 已提交
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179

    pBtc->iPage--;
    if (pBtc->iPage < 0) break;

    pBtc->pPage = pBtc->pgStack[pBtc->iPage];
    pBtc->idx = pBtc->idxStack[pBtc->iPage];
  }

  return 0;
}
H
Hongze Cheng 已提交
2180 2181 2182 2183 2184 2185 2186 2187

int tdbBtcIsValid(SBTC *pBtc) {
  if (pBtc->idx < 0) {
    return 0;
  } else {
    return 1;
  }
}
H
refact  
Hongze Cheng 已提交
2188
// TDB_BTREE_CURSOR
H
Hongze Cheng 已提交
2189

H
refact  
Hongze Cheng 已提交
2190
// TDB_BTREE_DEBUG =====================
H
Hongze Cheng 已提交
2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
#ifndef NODEBUG
typedef struct {
  SPgno pgno;
  u8    root;
  u8    leaf;
  SPgno rChild;
  int   nCells;
  int   nOvfl;
} SBtPageInfo;

SBtPageInfo btPageInfos[20];

void tdbBtPageInfo(SPage *pPage, int idx) {
  SBtPageInfo *pBtPageInfo;

  pBtPageInfo = btPageInfos + idx;

  pBtPageInfo->pgno = TDB_PAGE_PGNO(pPage);

H
refact  
Hongze Cheng 已提交
2210 2211
  pBtPageInfo->root = TDB_BTREE_PAGE_IS_ROOT(pPage);
  pBtPageInfo->leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
H
Hongze Cheng 已提交
2212 2213 2214 2215 2216 2217 2218 2219 2220

  pBtPageInfo->rChild = 0;
  if (!pBtPageInfo->leaf) {
    pBtPageInfo->rChild = *(SPgno *)(pPage->pData + 1);
  }

  pBtPageInfo->nCells = TDB_PAGE_TOTAL_CELLS(pPage) - pPage->nOverflow;
  pBtPageInfo->nOvfl = pPage->nOverflow;
}
H
refact  
Hongze Cheng 已提交
2221
#endif
2222
// TDB_BTREE_DEBUG