tsdbMemTable.c 23.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
#define MEM_MIN_HASH 1024
H
Hongze Cheng 已提交
19
#define SL_MAX_LEVEL 5
H
Hongze Cheng 已提交
20

H
Hongze Cheng 已提交
21
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
H
Hongze Cheng 已提交
22 23 24
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
H
Hongze Cheng 已提交
25 26 27 28
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30 31 32 33 34
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2

static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
H
Hongze Cheng 已提交
35 36 37 38
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
H
Hongze Cheng 已提交
39 40 41 42

int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
  int32_t    code = 0;
  SMemTable *pMemTable = NULL;
H
Hongze Cheng 已提交
43

H
refact  
Hongze Cheng 已提交
44
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
45
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
46 47
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
48
  }
H
Hongze Cheng 已提交
49
  taosInitRWLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
50
  pMemTable->pTsdb = pTsdb;
H
Hongze Cheng 已提交
51
  pMemTable->pPool = pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
52
  pMemTable->nRef = 1;
H
Hongze Cheng 已提交
53 54
  pMemTable->minVer = VERSION_MAX;
  pMemTable->maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
55 56
  pMemTable->minKey = TSKEY_MAX;
  pMemTable->maxKey = TSKEY_MIN;
H
Hongze Cheng 已提交
57
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
58
  pMemTable->nDel = 0;
H
Hongze Cheng 已提交
59 60 61 62
  pMemTable->nTbData = 0;
  pMemTable->nBucket = MEM_MIN_HASH;
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
  if (pMemTable->aBucket == NULL) {
H
Hongze Cheng 已提交
63
    code = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
64
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
65
    goto _err;
H
Hongze Cheng 已提交
66
  }
H
Hongze Cheng 已提交
67
  vnodeBufPoolRef(pMemTable->pPool);
H
Hongze Cheng 已提交
68

H
Hongze Cheng 已提交
69 70 71 72 73 74 75 76
  *ppMemTable = pMemTable;
  return code;

_err:
  *ppMemTable = NULL;
  return code;
}

H
Hongze Cheng 已提交
77
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
H
Hongze Cheng 已提交
78
  if (pMemTable) {
H
Hongze Cheng 已提交
79
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
H
Hongze Cheng 已提交
80
    taosMemoryFree(pMemTable->aBucket);
wafwerar's avatar
wafwerar 已提交
81
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
82 83 84
  }
}

H
Hongze Cheng 已提交
85 86
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
H
Hongze Cheng 已提交
87

H
Hongze Cheng 已提交
88 89 90
  while (pTbData) {
    if (pTbData->uid == uid) break;
    pTbData = pTbData->next;
H
Hongze Cheng 已提交
91
  }
H
Hongze Cheng 已提交
92

H
Hongze Cheng 已提交
93
  return pTbData;
H
Hongze Cheng 已提交
94
}
H
Hongze Cheng 已提交
95 96 97

STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
  STbData *pTbData;
H
Hongze Cheng 已提交
98 99

  taosRLockLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
100
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
H
Hongze Cheng 已提交
101 102
  taosRUnLockLatch(&pMemTable->latch);

H
Hongze Cheng 已提交
103
  return pTbData;
H
Hongze Cheng 已提交
104
}
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
H
Hongze Cheng 已提交
107 108 109
  int32_t    code = 0;
  SMemTable *pMemTable = pTsdb->mem;
  STbData   *pTbData = NULL;
H
Hongze Cheng 已提交
110 111
  tb_uid_t   suid = pSubmitTbData->suid;
  tb_uid_t   uid = pSubmitTbData->uid;
H
Hongze Cheng 已提交
112

H
Hongze Cheng 已提交
113 114 115 116 117 118 119
  // create/get STbData to op
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
  if (code) {
    goto _err;
  }

  // do insert impl
H
Hongze Cheng 已提交
120 121 122 123
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
  } else {
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
H
Hongze Cheng 已提交
124
  }
H
Hongze Cheng 已提交
125
  if (code) goto _err;
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127 128 129
  // update
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
H
Hongze Cheng 已提交
130 131 132 133

  return code;

_err:
D
dapan1121 已提交
134
  terrno = code;
H
Hongze Cheng 已提交
135 136 137 138 139 140 141 142
  return code;
}

int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
  int32_t    code = 0;
  SMemTable *pMemTable = pTsdb->mem;
  STbData   *pTbData = NULL;
  SVBufPool *pPool = pTsdb->pVnode->inUse;
143
  TSDBKEY    lastKey = {.version = version, .ts = eKey};
H
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145 146
  // check if table exists
  SMetaInfo info;
147
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
H
Hongze Cheng 已提交
148 149 150 151 152 153 154 155
  if (code) {
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
    goto _err;
  }
  if (info.suid != suid) {
    code = TSDB_CODE_INVALID_MSG;
    goto _err;
  }
H
Hongze Cheng 已提交
156 157 158 159 160 161

  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
  if (code) {
    goto _err;
  }

162
  ASSERT(pPool != NULL);
H
Hongze Cheng 已提交
163
  // do delete
H
Hongze Cheng 已提交
164 165
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
  if (pDelData == NULL) {
H
Hongze Cheng 已提交
166 167 168
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
169 170 171 172
  pDelData->version = version;
  pDelData->sKey = sKey;
  pDelData->eKey = eKey;
  pDelData->pNext = NULL;
H
Hongze Cheng 已提交
173 174
  if (pTbData->pHead == NULL) {
    ASSERT(pTbData->pTail == NULL);
H
Hongze Cheng 已提交
175
    pTbData->pHead = pTbData->pTail = pDelData;
H
Hongze Cheng 已提交
176
  } else {
H
Hongze Cheng 已提交
177 178
    pTbData->pTail->pNext = pDelData;
    pTbData->pTail = pDelData;
H
Hongze Cheng 已提交
179 180
  }

H
Hongze Cheng 已提交
181
  pMemTable->nDel++;
H
Hongze Cheng 已提交
182 183
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
  pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
H
Hongze Cheng 已提交
184

185 186 187 188 189 190
  if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
    tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
  }

  if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
    tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
191 192
  }

L
Liu Jicong 已提交
193
  tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
194 195
           " at version %" PRId64 " since %s",
           TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
H
Hongze Cheng 已提交
196 197 198
  return code;

_err:
C
Cary Xu 已提交
199
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
200 201
            " at version %" PRId64 " since %s",
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
H
Hongze Cheng 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
  return code;
}

int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) {
  int32_t code = 0;

  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
  if ((*ppIter) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);

_exit:
  return code;
}

void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
  if (pIter) {
    taosMemoryFree(pIter);
  }
  return NULL;
}

H
Hongze Cheng 已提交
227
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) {
H
Hongze Cheng 已提交
228
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
229 230
  SMemSkipListNode *pHead;
  SMemSkipListNode *pTail;
H
Hongze Cheng 已提交
231

H
Hongze Cheng 已提交
232 233
  pHead = pTbData->sl.pHead;
  pTail = pTbData->sl.pTail;
H
Hongze Cheng 已提交
234 235
  pIter->pTbData = pTbData;
  pIter->backward = backward;
H
Hongze Cheng 已提交
236
  pIter->pRow = NULL;
H
Hongze Cheng 已提交
237 238 239
  if (pFrom == NULL) {
    // create from head or tail
    if (backward) {
H
Hongze Cheng 已提交
240
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
H
Hongze Cheng 已提交
241
    } else {
H
Hongze Cheng 已提交
242
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
H
Hongze Cheng 已提交
243 244 245 246 247
    }
  } else {
    // create from a key
    if (backward) {
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
H
Hongze Cheng 已提交
248
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
H
Hongze Cheng 已提交
249 250
    } else {
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
H
Hongze Cheng 已提交
251
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
H
Hongze Cheng 已提交
252 253 254 255 256
    }
  }
}

bool tsdbTbDataIterNext(STbDataIter *pIter) {
H
Hongze Cheng 已提交
257
  pIter->pRow = NULL;
H
Hongze Cheng 已提交
258
  if (pIter->backward) {
H
Hongze Cheng 已提交
259
    ASSERT(pIter->pNode != pIter->pTbData->sl.pTail);
H
Hongze Cheng 已提交
260

H
Hongze Cheng 已提交
261
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
H
Hongze Cheng 已提交
262 263 264
      return false;
    }

H
Hongze Cheng 已提交
265
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
H
Hongze Cheng 已提交
266
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
H
Hongze Cheng 已提交
267 268 269
      return false;
    }
  } else {
H
Hongze Cheng 已提交
270
    ASSERT(pIter->pNode != pIter->pTbData->sl.pHead);
H
Hongze Cheng 已提交
271

H
Hongze Cheng 已提交
272
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
H
Hongze Cheng 已提交
273 274 275
      return false;
    }

H
Hongze Cheng 已提交
276
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
H
Hongze Cheng 已提交
277
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
H
Hongze Cheng 已提交
278 279 280 281 282 283 284
      return false;
    }
  }

  return true;
}

H
Hongze Cheng 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
  int32_t code = 0;

  int32_t   nBucket = pMemTable->nBucket * 2;
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
    STbData *pTbData = pMemTable->aBucket[iBucket];

    while (pTbData) {
      STbData *pNext = pTbData->next;

      int32_t idx = TABS(pTbData->uid) % nBucket;
      pTbData->next = aBucket[idx];
      aBucket[idx] = pTbData;

      pTbData = pNext;
    }
  }

  taosMemoryFree(pMemTable->aBucket);
  pMemTable->nBucket = nBucket;
  pMemTable->aBucket = aBucket;

_exit:
  return code;
}

H
Hongze Cheng 已提交
317
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
H
Hongze Cheng 已提交
318
  int32_t code = 0;
H
Hongze Cheng 已提交
319 320

  // get
H
Hongze Cheng 已提交
321 322
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
  if (pTbData) goto _exit;
323

H
Hongze Cheng 已提交
324 325 326 327
  // create
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;

328
  ASSERT(pPool != NULL);
H
Hongze Cheng 已提交
329
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
H
Hongze Cheng 已提交
330 331 332
  if (pTbData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
333
  }
H
Hongze Cheng 已提交
334 335
  pTbData->suid = suid;
  pTbData->uid = uid;
H
Hongze Cheng 已提交
336 337
  pTbData->minKey = TSKEY_MAX;
  pTbData->maxKey = TSKEY_MIN;
H
Hongze Cheng 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
  pTbData->pHead = NULL;
  pTbData->pTail = NULL;
  pTbData->sl.seed = taosRand();
  pTbData->sl.size = 0;
  pTbData->sl.maxLevel = maxLevel;
  pTbData->sl.level = 0;
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
  pTbData->sl.pHead->level = maxLevel;
  pTbData->sl.pTail->level = maxLevel;
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;

    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
  }

H
Hongze Cheng 已提交
356 357 358 359 360 361 362 363
  taosWLockLatch(&pMemTable->latch);

  if (pMemTable->nTbData >= pMemTable->nBucket) {
    code = tsdbMemTableRehash(pMemTable);
    if (code) {
      taosWUnLockLatch(&pMemTable->latch);
      goto _err;
    }
H
Hongze Cheng 已提交
364
  }
H
Hongze Cheng 已提交
365

H
Hongze Cheng 已提交
366 367 368 369
  int32_t idx = TABS(uid) % pMemTable->nBucket;
  pTbData->next = pMemTable->aBucket[idx];
  pMemTable->aBucket[idx] = pTbData;
  pMemTable->nTbData++;
H
Hongze Cheng 已提交
370

H
Hongze Cheng 已提交
371
  taosWUnLockLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
372

H
Hongze Cheng 已提交
373 374 375
_exit:
  *ppTbData = pTbData;
  return code;
H
Hongze Cheng 已提交
376

H
Hongze Cheng 已提交
377 378 379 380 381 382 383 384
_err:
  *ppTbData = NULL;
  return code;
}

static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
  SMemSkipListNode *px;
  SMemSkipListNode *pn;
H
Hongze Cheng 已提交
385
  TSDBKEY           tKey = {0};
H
Hongze Cheng 已提交
386 387
  int32_t           backward = flags & SL_MOVE_BACKWARD;
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
H
Hongze Cheng 已提交
388 389 390 391

  if (backward) {
    px = pTbData->sl.pTail;

H
Hongze Cheng 已提交
392 393 394 395
    if (!fromPos) {
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
        pos[iLevel] = px;
      }
H
Hongze Cheng 已提交
396 397
    }

H
Hongze Cheng 已提交
398 399
    if (pTbData->sl.level) {
      if (fromPos) px = pos[pTbData->sl.level - 1];
H
Hongze Cheng 已提交
400

H
Hongze Cheng 已提交
401
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
H
Hongze Cheng 已提交
402
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
H
Hongze Cheng 已提交
403
        while (pn != pTbData->sl.pHead) {
H
Hongze Cheng 已提交
404 405 406 407 408 409 410
          if (pn->flag == TSDBROW_ROW_FMT) {
            tKey.version = pn->version;
            tKey.ts = ((SRow *)pn->pData)->ts;
          } else if (pn->flag == TSDBROW_COL_FMT) {
            tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
            tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
          }
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412
          int32_t c = tsdbKeyCmprFn(&tKey, pKey);
H
Hongze Cheng 已提交
413 414 415 416
          if (c <= 0) {
            break;
          } else {
            px = pn;
H
Hongze Cheng 已提交
417
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
H
Hongze Cheng 已提交
418 419 420 421 422 423
          }
        }

        pos[iLevel] = px;
      }
    }
H
Hongze Cheng 已提交
424
  } else {
H
Hongze Cheng 已提交
425 426
    px = pTbData->sl.pHead;

H
Hongze Cheng 已提交
427 428 429 430
    if (!fromPos) {
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
        pos[iLevel] = px;
      }
H
Hongze Cheng 已提交
431 432 433 434 435 436
    }

    if (pTbData->sl.level) {
      if (fromPos) px = pos[pTbData->sl.level - 1];

      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
H
Hongze Cheng 已提交
437
        pn = SL_GET_NODE_FORWARD(px, iLevel);
H
Hongze Cheng 已提交
438
        while (pn != pTbData->sl.pTail) {
H
Hongze Cheng 已提交
439 440 441 442 443 444 445
          if (pn->flag == TSDBROW_ROW_FMT) {
            tKey.version = pn->version;
            tKey.ts = ((SRow *)pn->pData)->ts;
          } else if (pn->flag == TSDBROW_COL_FMT) {
            tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
            tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
          }
H
Hongze Cheng 已提交
446 447

          int32_t c = tsdbKeyCmprFn(&tKey, pKey);
H
Hongze Cheng 已提交
448 449 450 451
          if (c >= 0) {
            break;
          } else {
            px = pn;
H
Hongze Cheng 已提交
452
            pn = SL_GET_NODE_FORWARD(px, iLevel);
H
Hongze Cheng 已提交
453 454 455 456 457 458
          }
        }

        pos[iLevel] = px;
      }
    }
H
Hongze Cheng 已提交
459
  }
H
Hongze Cheng 已提交
460
}
H
Hongze Cheng 已提交
461

H
Hongze Cheng 已提交
462
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
H
Hongze Cheng 已提交
463 464
  int8_t level = 1;
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
H
Hongze Cheng 已提交
465

H
Hongze Cheng 已提交
466
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
H
Hongze Cheng 已提交
467 468
    level++;
  }
H
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470 471 472 473 474 475 476 477
  return level;
}
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
                           int8_t forward) {
  int32_t           code = 0;
  int8_t            level;
  SMemSkipListNode *pNode;
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
478
  int64_t           nSize;
H
Hongze Cheng 已提交
479

H
Hongze Cheng 已提交
480
  // create node
H
Hongze Cheng 已提交
481
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
H
Hongze Cheng 已提交
482
  nSize = SL_NODE_SIZE(level);
X
Xiaoyu Wang 已提交
483 484 485 486 487 488 489
  if (pRow->type == TSDBROW_ROW_FMT) {
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
  } else if (pRow->type == TSDBROW_COL_FMT) {
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
  } else {
    ASSERT(0);
  }
H
Hongze Cheng 已提交
490 491 492 493
  if (pNode == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
X
Xiaoyu Wang 已提交
494

H
Hongze Cheng 已提交
495
  pNode->level = level;
H
Hongze Cheng 已提交
496 497 498
  pNode->flag = pRow->type;
  if (pRow->type == TSDBROW_ROW_FMT) {
    pNode->version = pRow->version;
X
Xiaoyu Wang 已提交
499
    pNode->pData = (char *)pNode + nSize;
H
Hongze Cheng 已提交
500 501 502 503
    memcpy(pNode->pData, pRow->pTSRow, pRow->pTSRow->len);
  } else if (pRow->type == TSDBROW_COL_FMT) {
    pNode->iRow = pRow->iRow;
    pNode->pData = pRow->pBlockData;
H
Hongze Cheng 已提交
504 505
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
506
  }
C
Cary Xu 已提交
507

H
Hongze Cheng 已提交
508 509 510 511 512 513 514 515 516 517
  // set node
  if (forward) {
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
    }
  } else {
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
H
Hongze Cheng 已提交
518 519
    }
  }
H
Hongze Cheng 已提交
520

H
Hongze Cheng 已提交
521 522 523 524
  // set forward and backward
  if (forward) {
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
C
Cary Xu 已提交
525

H
Hongze Cheng 已提交
526 527
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
H
Hongze Cheng 已提交
528

H
Hongze Cheng 已提交
529
      pos[iLevel] = pNode;
H
Hongze Cheng 已提交
530
    }
H
Hongze Cheng 已提交
531 532 533 534 535 536
  } else {
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];

      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
H
Hongze Cheng 已提交
537

H
Hongze Cheng 已提交
538 539
      pos[iLevel] = pNode;
    }
H
Hongze Cheng 已提交
540 541
  }

H
Hongze Cheng 已提交
542 543 544
  pTbData->sl.size++;
  if (pTbData->sl.level < pNode->level) {
    pTbData->sl.level = pNode->level;
H
Hongze Cheng 已提交
545 546
  }

H
Hongze Cheng 已提交
547 548
_exit:
  return code;
H
Hongze Cheng 已提交
549 550
}

H
Hongze Cheng 已提交
551 552 553
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
  int32_t code = 0;
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555 556 557
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
H
Hongze Cheng 已提交
558

L
Liu Jicong 已提交
559 560 561
  ASSERT(aColData[0].cid == PRIMARYKEY_TIMESTAMP_COL_ID);
  ASSERT(aColData[0].type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(aColData[0].flag == HAS_VALUE);
H
Hongze Cheng 已提交
562

H
Hongze Cheng 已提交
563 564 565 566 567
  // copy and construct block data
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
  if (pBlockData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
H
Hongze Cheng 已提交
568 569
  }

H
Hongze Cheng 已提交
570 571 572 573 574 575 576 577 578 579 580
  pBlockData->suid = pTbData->suid;
  pBlockData->uid = pTbData->uid;
  pBlockData->nRow = aColData[0].nVal;
  pBlockData->aUid = NULL;
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
  if (pBlockData->aVersion == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
    pBlockData->aVersion[i] = version;
H
Hongze Cheng 已提交
581 582
  }

H
Hongze Cheng 已提交
583 584 585 586 587 588 589 590
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
  if (pBlockData->aTSKEY == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);

  pBlockData->nColData = nColData - 1;
H
Hongze Cheng 已提交
591 592 593 594 595 596 597
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
  if (pBlockData->aColData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
    if (code) goto _exit;
H
Hongze Cheng 已提交
598 599
  }

H
Hongze Cheng 已提交
600 601
  // loop to add each row to the skiplist
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
602 603 604
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
  TSDBKEY           key = {.version = version, .ts = pBlockData->aTSKEY[0]};
  TSDBROW           lRow;  // last row
H
Hongze Cheng 已提交
605

H
Hongze Cheng 已提交
606
  // first row
H
Hongze Cheng 已提交
607
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
H
Hongze Cheng 已提交
608
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
H
Hongze Cheng 已提交
609
  pTbData->minKey = TMIN(pTbData->minKey, key.ts);
H
Hongze Cheng 已提交
610
  lRow = tRow;
H
Hongze Cheng 已提交
611

H
Hongze Cheng 已提交
612 613
  // remain row
  ++tRow.iRow;
H
Hongze Cheng 已提交
614
  if (tRow.iRow < pBlockData->nRow) {
H
Hongze Cheng 已提交
615 616 617 618
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
    }

H
Hongze Cheng 已提交
619 620 621
    while (tRow.iRow < pBlockData->nRow) {
      key.ts = pBlockData->aTSKEY[tRow.iRow];

H
Hongze Cheng 已提交
622 623 624 625
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
      }

H
Hongze Cheng 已提交
626
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
H
Hongze Cheng 已提交
627
      lRow = tRow;
H
Hongze Cheng 已提交
628

H
Hongze Cheng 已提交
629
      ++tRow.iRow;
H
Hongze Cheng 已提交
630 631
    }
  }
H
Hongze Cheng 已提交
632

H
Hongze Cheng 已提交
633 634 635
  if (key.ts >= pTbData->maxKey) {
    pTbData->maxKey = key.ts;

H
Hongze Cheng 已提交
636
    if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
H
Hongze Cheng 已提交
637
      tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
H
Hongze Cheng 已提交
638
    }
H
Hongze Cheng 已提交
639 640 641
  }

  if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
H
Hongze Cheng 已提交
642
    tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
H
Hongze Cheng 已提交
643 644 645 646 647 648 649 650 651
  }

  // SMemTable
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
  pMemTable->nRow += pBlockData->nRow;

  if (affectedRows) *affectedRows = pBlockData->nRow;

H
Hongze Cheng 已提交
652 653 654
_exit:
  return code;
}
H
Hongze Cheng 已提交
655

H
Hongze Cheng 已提交
656 657
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
H
Hongze Cheng 已提交
658
  int32_t code = 0;
H
Hongze Cheng 已提交
659

H
Hongze Cheng 已提交
660 661
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
H
Hongze Cheng 已提交
662 663
  TSDBKEY           key = {.version = version};
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
664
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
H
Hongze Cheng 已提交
665
  int32_t           iRow = 0;
H
Hongze Cheng 已提交
666
  TSDBROW           lRow;
H
Hongze Cheng 已提交
667

H
Hongze Cheng 已提交
668
  // backward put first data
H
Hongze Cheng 已提交
669 670
  tRow.pTSRow = aRow[iRow++];
  key.ts = tRow.pTSRow->ts;
H
Hongze Cheng 已提交
671
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
H
Hongze Cheng 已提交
672
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
H
Hongze Cheng 已提交
673
  if (code) goto _exit;
H
Hongze Cheng 已提交
674
  lRow = tRow;
H
Hongze Cheng 已提交
675

H
Hongze Cheng 已提交
676
  pTbData->minKey = TMIN(pTbData->minKey, key.ts);
677

H
Hongze Cheng 已提交
678
  // forward put rest data
H
Hongze Cheng 已提交
679
  if (iRow < nRow) {
H
Hongze Cheng 已提交
680
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
H
Hongze Cheng 已提交
681 682
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
    }
H
Hongze Cheng 已提交
683 684

    while (iRow < nRow) {
H
Hongze Cheng 已提交
685
      tRow.pTSRow = aRow[iRow];
H
Hongze Cheng 已提交
686
      key.ts = tRow.pTSRow->ts;
H
Hongze Cheng 已提交
687

H
Hongze Cheng 已提交
688 689 690
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
      }
H
Hongze Cheng 已提交
691

H
Hongze Cheng 已提交
692
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
H
Hongze Cheng 已提交
693
      if (code) goto _exit;
694

H
Hongze Cheng 已提交
695
      lRow = tRow;
696

H
Hongze Cheng 已提交
697 698
      iRow++;
    }
H
Hongze Cheng 已提交
699 700
  }

701
  if (key.ts >= pTbData->maxKey) {
H
Hongze Cheng 已提交
702
    pTbData->maxKey = key.ts;
703

H
Hongze Cheng 已提交
704
    if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
H
Hongze Cheng 已提交
705
      tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
706
    }
H
Hongze Cheng 已提交
707
  }
708

709
  if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
H
Hongze Cheng 已提交
710
    tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
711
  }
712

H
Hongze Cheng 已提交
713
  // SMemTable
H
Hongze Cheng 已提交
714 715
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
H
Hongze Cheng 已提交
716
  pMemTable->nRow += nRow;
H
Hongze Cheng 已提交
717

H
Hongze Cheng 已提交
718
  if (affectedRows) *affectedRows = nRow;
H
Hongze Cheng 已提交
719

H
Hongze Cheng 已提交
720
_exit:
H
Hongze Cheng 已提交
721
  return code;
H
Hongze Cheng 已提交
722
}
H
Hongze Cheng 已提交
723

724
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
H
Hongze Cheng 已提交
725

H
Hongze Cheng 已提交
726
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
H
Hongze Cheng 已提交
727 728
  int32_t code = 0;

H
Hongze Cheng 已提交
729 730
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
  ASSERT(nRef > 0);
H
Hongze Cheng 已提交
731

H
Hongze Cheng 已提交
732
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
H
Hongze Cheng 已提交
733

H
Hongze Cheng 已提交
734
_exit:
H
Hongze Cheng 已提交
735
  return code;
H
Hongze Cheng 已提交
736 737
}

H
Hongze Cheng 已提交
738
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
H
Hongze Cheng 已提交
739
  int32_t code = 0;
H
Hongze Cheng 已提交
740 741

  if (pNode) {
H
Hongze Cheng 已提交
742
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
H
Hongze Cheng 已提交
743 744
  }

H
Hongze Cheng 已提交
745 746
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
    tsdbMemTableDestroy(pMemTable, proactive);
H
Hongze Cheng 已提交
747
  }
H
Hongze Cheng 已提交
748 749

  return code;
H
Hongze Cheng 已提交
750
}
H
Hongze Cheng 已提交
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787

static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
  STbData *pTbData1 = *(STbData **)p1;
  STbData *pTbData2 = *(STbData **)p2;

  if (pTbData1->suid < pTbData2->suid) {
    return -1;
  } else if (pTbData1->suid > pTbData2->suid) {
    return 1;
  }

  if (pTbData1->uid < pTbData2->uid) {
    return -1;
  } else if (pTbData1->uid > pTbData2->uid) {
    return 1;
  }

  return 0;
}

SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
  SArray *aTbDataP = taosArrayInit(pMemTable->nTbData, sizeof(STbData *));
  if (aTbDataP == NULL) goto _exit;

  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
    STbData *pTbData = pMemTable->aBucket[iBucket];

    while (pTbData) {
      taosArrayPush(aTbDataP, &pTbData);
      pTbData = pTbData->next;
    }
  }

  taosArraySort(aTbDataP, tbDataPCmprFn);

_exit:
  return aTbDataP;
H
Hongze Cheng 已提交
788
}