tsdbMemTable.c 17.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
#define MEM_MIN_HASH 1024
H
Hongze Cheng 已提交
19
#define SL_MAX_LEVEL 5
H
Hongze Cheng 已提交
20

H
Hongze Cheng 已提交
21 22 23 24
#define SL_NODE_SIZE(l)        (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_FORWARD(n, l)  ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n)        (&SL_NODE_BACKWARD(n, (n)->level))
H
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26 27 28 29 30 31 32 33 34 35 36
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2

static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                       SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp);

int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
  int32_t    code = 0;
  SMemTable *pMemTable = NULL;
H
Hongze Cheng 已提交
37

H
refact  
Hongze Cheng 已提交
38
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
39
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
40 41
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
42
  }
H
Hongze Cheng 已提交
43
  taosInitRWLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
44
  pMemTable->pTsdb = pTsdb;
H
Hongze Cheng 已提交
45
  pMemTable->pPool = pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
46
  pMemTable->nRef = 1;
H
Hongze Cheng 已提交
47 48
  pMemTable->minKey = TSKEY_MAX;
  pMemTable->maxKey = TSKEY_MIN;
H
Hongze Cheng 已提交
49
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
50
  pMemTable->nDel = 0;
H
Hongze Cheng 已提交
51 52 53 54
  pMemTable->nTbData = 0;
  pMemTable->nBucket = MEM_MIN_HASH;
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
  if (pMemTable->aBucket == NULL) {
H
Hongze Cheng 已提交
55
    code = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
56
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
57
    goto _err;
H
Hongze Cheng 已提交
58
  }
H
Hongze Cheng 已提交
59
  vnodeBufPoolRef(pMemTable->pPool);
H
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61 62 63 64 65 66 67 68 69 70
  *ppMemTable = pMemTable;
  return code;

_err:
  *ppMemTable = NULL;
  return code;
}

void tsdbMemTableDestroy(SMemTable *pMemTable) {
  if (pMemTable) {
H
Hongze Cheng 已提交
71
    vnodeBufPoolUnRef(pMemTable->pPool);
H
Hongze Cheng 已提交
72
    taosMemoryFree(pMemTable->aBucket);
wafwerar's avatar
wafwerar 已提交
73
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
74 75 76
  }
}

H
Hongze Cheng 已提交
77 78
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
H
Hongze Cheng 已提交
79

H
Hongze Cheng 已提交
80 81 82
  while (pTbData) {
    if (pTbData->uid == uid) break;
    pTbData = pTbData->next;
H
Hongze Cheng 已提交
83
  }
H
Hongze Cheng 已提交
84

H
Hongze Cheng 已提交
85
  return pTbData;
H
Hongze Cheng 已提交
86
}
H
Hongze Cheng 已提交
87 88 89

STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
  STbData *pTbData;
H
Hongze Cheng 已提交
90 91

  taosRLockLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
92
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
H
Hongze Cheng 已提交
93 94
  taosRUnLockLatch(&pMemTable->latch);

H
Hongze Cheng 已提交
95
  return pTbData;
H
Hongze Cheng 已提交
96
}
H
Hongze Cheng 已提交
97

H
Hongze Cheng 已提交
98 99 100 101 102 103 104
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock,
                            SSubmitBlkRsp *pRsp) {
  int32_t    code = 0;
  SMemTable *pMemTable = pTsdb->mem;
  STbData   *pTbData = NULL;
  tb_uid_t   suid = pMsgIter->suid;
  tb_uid_t   uid = pMsgIter->uid;
H
Hongze Cheng 已提交
105 106 107 108 109

  SMetaInfo info;
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
  if (code) {
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
110 111
    goto _err;
  }
H
Hongze Cheng 已提交
112 113 114
  if (info.suid != suid) {
    code = TSDB_CODE_INVALID_MSG;
    goto _err;
H
Hongze Cheng 已提交
115
  }
H
Hongze Cheng 已提交
116 117
  if (info.suid) {
    metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
H
Hongze Cheng 已提交
118
  }
H
Hongze Cheng 已提交
119
  pRsp->sver = info.skmVer;
H
Hongze Cheng 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143

  // create/get STbData to op
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
  if (code) {
    goto _err;
  }

  // do insert impl
  code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pMsgIter, pBlock, pRsp);
  if (code) {
    goto _err;
  }

  return code;

_err:
  return code;
}

int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
  int32_t    code = 0;
  SMemTable *pMemTable = pTsdb->mem;
  STbData   *pTbData = NULL;
  SVBufPool *pPool = pTsdb->pVnode->inUse;
144
  TSDBKEY    lastKey = {.version = version, .ts = eKey};
H
Hongze Cheng 已提交
145

H
Hongze Cheng 已提交
146 147 148 149 150 151 152 153 154 155 156
  // check if table exists
  SMetaInfo info;
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
  if (code) {
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
    goto _err;
  }
  if (info.suid != suid) {
    code = TSDB_CODE_INVALID_MSG;
    goto _err;
  }
H
Hongze Cheng 已提交
157 158 159 160 161 162 163

  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
  if (code) {
    goto _err;
  }

  // do delete
H
Hongze Cheng 已提交
164 165
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
  if (pDelData == NULL) {
H
Hongze Cheng 已提交
166 167 168
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
169 170 171 172
  pDelData->version = version;
  pDelData->sKey = sKey;
  pDelData->eKey = eKey;
  pDelData->pNext = NULL;
H
Hongze Cheng 已提交
173 174
  if (pTbData->pHead == NULL) {
    ASSERT(pTbData->pTail == NULL);
H
Hongze Cheng 已提交
175
    pTbData->pHead = pTbData->pTail = pDelData;
H
Hongze Cheng 已提交
176
  } else {
H
Hongze Cheng 已提交
177 178
    pTbData->pTail->pNext = pDelData;
    pTbData->pTail = pDelData;
H
Hongze Cheng 已提交
179 180
  }

H
Hongze Cheng 已提交
181
  pMemTable->nDel++;
H
Hongze Cheng 已提交
182

183 184 185 186 187 188
  if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
    tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
  }

  if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
    tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
189 190
  }

L
Liu Jicong 已提交
191 192 193
  tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
           " since %s",
           TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
H
Hongze Cheng 已提交
194 195 196
  return code;

_err:
C
Cary Xu 已提交
197
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
H
Hongze Cheng 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
            " since %s",
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
  return code;
}

int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) {
  int32_t code = 0;

  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
  if ((*ppIter) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);

_exit:
  return code;
}

void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
  if (pIter) {
    taosMemoryFree(pIter);
  }

  return NULL;
}

H
Hongze Cheng 已提交
226
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) {
H
Hongze Cheng 已提交
227
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
228 229
  SMemSkipListNode *pHead;
  SMemSkipListNode *pTail;
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231 232
  pHead = pTbData->sl.pHead;
  pTail = pTbData->sl.pTail;
H
Hongze Cheng 已提交
233 234
  pIter->pTbData = pTbData;
  pIter->backward = backward;
H
Hongze Cheng 已提交
235
  pIter->pRow = NULL;
H
Hongze Cheng 已提交
236
  pIter->row.type = 0;
H
Hongze Cheng 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  if (pFrom == NULL) {
    // create from head or tail
    if (backward) {
      pIter->pNode = SL_NODE_BACKWARD(pTbData->sl.pTail, 0);
    } else {
      pIter->pNode = SL_NODE_FORWARD(pTbData->sl.pHead, 0);
    }
  } else {
    // create from a key
    if (backward) {
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
      pIter->pNode = SL_NODE_BACKWARD(pos[0], 0);
    } else {
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
      pIter->pNode = SL_NODE_FORWARD(pos[0], 0);
    }
  }
}

bool tsdbTbDataIterNext(STbDataIter *pIter) {
  SMemSkipListNode *pHead = pIter->pTbData->sl.pHead;
  SMemSkipListNode *pTail = pIter->pTbData->sl.pTail;

H
Hongze Cheng 已提交
260
  pIter->pRow = NULL;
H
Hongze Cheng 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
  if (pIter->backward) {
    ASSERT(pIter->pNode != pTail);

    if (pIter->pNode == pHead) {
      return false;
    }

    pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
    if (pIter->pNode == pHead) {
      return false;
    }
  } else {
    ASSERT(pIter->pNode != pHead);

    if (pIter->pNode == pTail) {
      return false;
    }

    pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
    if (pIter->pNode == pTail) {
      return false;
    }
  }

  return true;
}

H
Hongze Cheng 已提交
288
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
H
Hongze Cheng 已提交
289 290 291
  // we add here for commit usage
  if (pIter == NULL) return NULL;

H
Hongze Cheng 已提交
292 293
  if (pIter->pRow) {
    goto _exit;
H
Hongze Cheng 已提交
294 295 296
  }

  if (pIter->backward) {
H
Hongze Cheng 已提交
297 298
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
      goto _exit;
H
Hongze Cheng 已提交
299 300
    }
  } else {
H
Hongze Cheng 已提交
301 302
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
      goto _exit;
H
Hongze Cheng 已提交
303
    }
H
Hongze Cheng 已提交
304
  }
H
Hongze Cheng 已提交
305

H
Hongze Cheng 已提交
306 307 308 309 310
  tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
  pIter->pRow = &pIter->row;

_exit:
  return pIter->pRow;
H
Hongze Cheng 已提交
311
}
H
Hongze Cheng 已提交
312

H
Hongze Cheng 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
  int32_t code = 0;

  int32_t   nBucket = pMemTable->nBucket * 2;
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
    STbData *pTbData = pMemTable->aBucket[iBucket];

    while (pTbData) {
      STbData *pNext = pTbData->next;

      int32_t idx = TABS(pTbData->uid) % nBucket;
      pTbData->next = aBucket[idx];
      aBucket[idx] = pTbData;

      pTbData = pNext;
    }
  }

  taosMemoryFree(pMemTable->aBucket);
  pMemTable->nBucket = nBucket;
  pMemTable->aBucket = aBucket;

_exit:
  return code;
}

H
Hongze Cheng 已提交
345
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
H
Hongze Cheng 已提交
346
  int32_t code = 0;
H
Hongze Cheng 已提交
347 348

  // get
H
Hongze Cheng 已提交
349 350
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
  if (pTbData) goto _exit;
351

H
Hongze Cheng 已提交
352 353 354 355 356 357 358 359
  // create
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;

  pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
  if (pTbData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
360
  }
H
Hongze Cheng 已提交
361 362
  pTbData->suid = suid;
  pTbData->uid = uid;
H
Hongze Cheng 已提交
363 364
  pTbData->minKey = TSKEY_MAX;
  pTbData->maxKey = TSKEY_MIN;
H
Hongze Cheng 已提交
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
  pTbData->pHead = NULL;
  pTbData->pTail = NULL;
  pTbData->sl.seed = taosRand();
  pTbData->sl.size = 0;
  pTbData->sl.maxLevel = maxLevel;
  pTbData->sl.level = 0;
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
  pTbData->sl.pHead->level = maxLevel;
  pTbData->sl.pTail->level = maxLevel;
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;

    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
  }

H
Hongze Cheng 已提交
383 384 385 386 387 388 389 390
  taosWLockLatch(&pMemTable->latch);

  if (pMemTable->nTbData >= pMemTable->nBucket) {
    code = tsdbMemTableRehash(pMemTable);
    if (code) {
      taosWUnLockLatch(&pMemTable->latch);
      goto _err;
    }
H
Hongze Cheng 已提交
391
  }
H
Hongze Cheng 已提交
392

H
Hongze Cheng 已提交
393 394 395 396
  int32_t idx = TABS(uid) % pMemTable->nBucket;
  pTbData->next = pMemTable->aBucket[idx];
  pMemTable->aBucket[idx] = pTbData;
  pMemTable->nTbData++;
H
Hongze Cheng 已提交
397

H
Hongze Cheng 已提交
398
  taosWUnLockLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
399

H
Hongze Cheng 已提交
400 401 402
_exit:
  *ppTbData = pTbData;
  return code;
H
Hongze Cheng 已提交
403

H
Hongze Cheng 已提交
404 405 406 407 408 409 410 411 412
_err:
  *ppTbData = NULL;
  return code;
}

static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
  SMemSkipListNode *px;
  SMemSkipListNode *pn;
  TSDBKEY          *pTKey;
H
Hongze Cheng 已提交
413 414
  int32_t           backward = flags & SL_MOVE_BACKWARD;
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
H
Hongze Cheng 已提交
415 416 417 418

  if (backward) {
    px = pTbData->sl.pTail;

H
Hongze Cheng 已提交
419 420 421 422
    if (!fromPos) {
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
        pos[iLevel] = px;
      }
H
Hongze Cheng 已提交
423 424
    }

H
Hongze Cheng 已提交
425 426
    if (pTbData->sl.level) {
      if (fromPos) px = pos[pTbData->sl.level - 1];
H
Hongze Cheng 已提交
427

H
Hongze Cheng 已提交
428 429 430 431 432
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
        pn = SL_NODE_BACKWARD(px, iLevel);
        while (pn != pTbData->sl.pHead) {
          pTKey = (TSDBKEY *)SL_NODE_DATA(pn);

H
Hongze Cheng 已提交
433
          int32_t c = tsdbKeyCmprFn(pTKey, pKey);
H
Hongze Cheng 已提交
434 435 436 437 438 439 440 441 442 443 444
          if (c <= 0) {
            break;
          } else {
            px = pn;
            pn = SL_NODE_BACKWARD(px, iLevel);
          }
        }

        pos[iLevel] = px;
      }
    }
H
Hongze Cheng 已提交
445
  } else {
H
Hongze Cheng 已提交
446 447
    px = pTbData->sl.pHead;

H
Hongze Cheng 已提交
448 449 450 451
    if (!fromPos) {
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
        pos[iLevel] = px;
      }
H
Hongze Cheng 已提交
452 453 454 455 456 457 458
    }

    if (pTbData->sl.level) {
      if (fromPos) px = pos[pTbData->sl.level - 1];

      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
        pn = SL_NODE_FORWARD(px, iLevel);
H
Hongze Cheng 已提交
459
        while (pn != pTbData->sl.pTail) {
H
Hongze Cheng 已提交
460
          int32_t c = tsdbKeyCmprFn(SL_NODE_DATA(pn), pKey);
H
Hongze Cheng 已提交
461 462 463 464 465 466 467 468 469 470 471
          if (c >= 0) {
            break;
          } else {
            px = pn;
            pn = SL_NODE_FORWARD(px, iLevel);
          }
        }

        pos[iLevel] = px;
      }
    }
H
Hongze Cheng 已提交
472
  }
H
Hongze Cheng 已提交
473
}
H
Hongze Cheng 已提交
474

H
Hongze Cheng 已提交
475 476 477 478
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
  int8_t         level = 1;
  int8_t         tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
  const uint32_t factor = 4;
H
Hongze Cheng 已提交
479

H
Hongze Cheng 已提交
480 481 482
  while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
    level++;
  }
H
Hongze Cheng 已提交
483

H
Hongze Cheng 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
  return level;
}
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
                           int8_t forward) {
  int32_t           code = 0;
  int8_t            level;
  SMemSkipListNode *pNode;
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;

  // node
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
  pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
  if (pNode == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
  pNode->level = level;
  tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow);
C
Cary Xu 已提交
502

H
Hongze Cheng 已提交
503 504 505
  for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
    SMemSkipListNode *pn = pos[iLevel];
    SMemSkipListNode *px;
H
Hongze Cheng 已提交
506

H
Hongze Cheng 已提交
507
    if (forward) {
H
Hongze Cheng 已提交
508 509 510 511 512 513
      px = SL_NODE_FORWARD(pn, iLevel);

      SL_NODE_BACKWARD(pNode, iLevel) = pn;
      SL_NODE_FORWARD(pNode, iLevel) = px;
    } else {
      px = SL_NODE_BACKWARD(pn, iLevel);
H
Hongze Cheng 已提交
514

H
Hongze Cheng 已提交
515
      SL_NODE_BACKWARD(pNode, iLevel) = px;
H
Hongze Cheng 已提交
516 517 518
      SL_NODE_FORWARD(pNode, iLevel) = pn;
    }
  }
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520 521 522
  for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
    SMemSkipListNode *pn = pos[iLevel];
    SMemSkipListNode *px;
C
Cary Xu 已提交
523

H
Hongze Cheng 已提交
524 525
    if (forward) {
      px = SL_NODE_FORWARD(pn, iLevel);
H
Hongze Cheng 已提交
526

H
Hongze Cheng 已提交
527
      SL_NODE_FORWARD(pn, iLevel) = pNode;
H
Hongze Cheng 已提交
528
      SL_NODE_BACKWARD(px, iLevel) = pNode;
H
Hongze Cheng 已提交
529 530 531 532 533
    } else {
      px = SL_NODE_BACKWARD(pn, iLevel);

      SL_NODE_FORWARD(px, iLevel) = pNode;
      SL_NODE_BACKWARD(pn, iLevel) = pNode;
H
Hongze Cheng 已提交
534
    }
H
Hongze Cheng 已提交
535 536

    pos[iLevel] = pNode;
H
Hongze Cheng 已提交
537 538
  }

H
Hongze Cheng 已提交
539 540 541
  pTbData->sl.size++;
  if (pTbData->sl.level < pNode->level) {
    pTbData->sl.level = pNode->level;
H
Hongze Cheng 已提交
542 543
  }

H
Hongze Cheng 已提交
544 545
_exit:
  return code;
H
Hongze Cheng 已提交
546 547
}

H
Hongze Cheng 已提交
548 549 550 551 552 553
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                       SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
  int32_t           code = 0;
  SSubmitBlkIter    blkIter = {0};
  TSDBKEY           key = {.version = version};
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
554
  TSDBROW           row = tsdbRowFromTSRow(version, NULL);
H
Hongze Cheng 已提交
555
  int32_t           nRow = 0;
556
  STSRow           *pLastRow = NULL;
H
Hongze Cheng 已提交
557 558 559 560 561 562 563 564 565 566 567

  tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter);

  // backward put first data
  row.pTSRow = tGetSubmitBlkNext(&blkIter);
  key.ts = row.pTSRow->ts;
  nRow++;
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
  code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0);
  if (code) {
    goto _err;
H
Hongze Cheng 已提交
568 569
  }

H
Hongze Cheng 已提交
570
  pTbData->minKey = TMIN(pTbData->minKey, key.ts);
H
Hongze Cheng 已提交
571

572 573
  pLastRow = row.pTSRow;

H
Hongze Cheng 已提交
574 575 576
  // forward put rest data
  row.pTSRow = tGetSubmitBlkNext(&blkIter);
  if (row.pTSRow) {
H
Hongze Cheng 已提交
577
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
H
Hongze Cheng 已提交
578 579 580
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
    }
    do {
H
more  
Hongze Cheng 已提交
581
      key.ts = row.pTSRow->ts;
H
Hongze Cheng 已提交
582 583 584 585 586 587 588
      nRow++;
      tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
      code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1);
      if (code) {
        goto _err;
      }

589 590
      pLastRow = row.pTSRow;

H
Hongze Cheng 已提交
591 592 593 594
      row.pTSRow = tGetSubmitBlkNext(&blkIter);
    } while (row.pTSRow);
  }

595 596 597 598
  if (key.ts >= pTbData->maxKey) {
    if (key.ts > pTbData->maxKey) {
      pTbData->maxKey = key.ts;
    }
599

600
    if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && pLastRow != NULL) {
601
      tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true);
602
    }
H
Hongze Cheng 已提交
603
  }
604

605 606 607
  if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
    tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
  }
608

H
Hongze Cheng 已提交
609
  // SMemTable
H
Hongze Cheng 已提交
610 611
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
H
Hongze Cheng 已提交
612
  pMemTable->nRow += nRow;
H
Hongze Cheng 已提交
613 614 615 616 617 618 619 620

  pRsp->numOfRows = nRow;
  pRsp->affectedRows = nRow;

  return code;

_err:
  return code;
H
Hongze Cheng 已提交
621
}
H
Hongze Cheng 已提交
622

623
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
H
Hongze Cheng 已提交
624 625 626 627 628 629 630 631 632 633 634 635

void tsdbRefMemTable(SMemTable *pMemTable) {
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
  ASSERT(nRef > 0);
}

void tsdbUnrefMemTable(SMemTable *pMemTable) {
  int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
  if (nRef == 0) {
    tsdbMemTableDestroy(pMemTable);
  }
}
H
Hongze Cheng 已提交
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673

static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
  STbData *pTbData1 = *(STbData **)p1;
  STbData *pTbData2 = *(STbData **)p2;

  if (pTbData1->suid < pTbData2->suid) {
    return -1;
  } else if (pTbData1->suid > pTbData2->suid) {
    return 1;
  }

  if (pTbData1->uid < pTbData2->uid) {
    return -1;
  } else if (pTbData1->uid > pTbData2->uid) {
    return 1;
  }

  return 0;
}

SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
  SArray *aTbDataP = taosArrayInit(pMemTable->nTbData, sizeof(STbData *));
  if (aTbDataP == NULL) goto _exit;

  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
    STbData *pTbData = pMemTable->aBucket[iBucket];

    while (pTbData) {
      taosArrayPush(aTbDataP, &pTbData);
      pTbData = pTbData->next;
    }
  }

  taosArraySort(aTbDataP, tbDataPCmprFn);

_exit:
  return aTbDataP;
}