tsdbMemTable.c 23.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17
#include "util/tsimplehash.h"
H
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19
#define MEM_MIN_HASH 1024
H
Hongze Cheng 已提交
20
#define SL_MAX_LEVEL 5
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
H
Hongze Cheng 已提交
23 24 25
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
H
Hongze Cheng 已提交
26 27 28 29
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31 32 33 34 35
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2

static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
H
Hongze Cheng 已提交
36 37 38 39
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
H
Hongze Cheng 已提交
40 41 42 43

int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
  int32_t    code = 0;
  SMemTable *pMemTable = NULL;
H
Hongze Cheng 已提交
44

H
refact  
Hongze Cheng 已提交
45
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
46
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
47 48
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
49
  }
H
Hongze Cheng 已提交
50
  taosInitRWLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
51
  pMemTable->pTsdb = pTsdb;
H
Hongze Cheng 已提交
52
  pMemTable->pPool = pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
53
  pMemTable->nRef = 1;
H
Hongze Cheng 已提交
54 55
  pMemTable->minVer = VERSION_MAX;
  pMemTable->maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
56 57
  pMemTable->minKey = TSKEY_MAX;
  pMemTable->maxKey = TSKEY_MIN;
H
Hongze Cheng 已提交
58
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
59
  pMemTable->nDel = 0;
H
Hongze Cheng 已提交
60 61 62 63
  pMemTable->nTbData = 0;
  pMemTable->nBucket = MEM_MIN_HASH;
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
  if (pMemTable->aBucket == NULL) {
H
Hongze Cheng 已提交
64
    code = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
65
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
66
    goto _err;
H
Hongze Cheng 已提交
67
  }
H
Hongze Cheng 已提交
68
  vnodeBufPoolRef(pMemTable->pPool);
H
Hongze Cheng 已提交
69

H
Hongze Cheng 已提交
70 71 72 73 74 75 76 77
  *ppMemTable = pMemTable;
  return code;

_err:
  *ppMemTable = NULL;
  return code;
}

H
Hongze Cheng 已提交
78
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
H
Hongze Cheng 已提交
79
  if (pMemTable) {
H
Hongze Cheng 已提交
80
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
H
Hongze Cheng 已提交
81
    taosMemoryFree(pMemTable->aBucket);
wafwerar's avatar
wafwerar 已提交
82
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
83 84 85
  }
}

H
Hongze Cheng 已提交
86 87
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
H
Hongze Cheng 已提交
88

H
Hongze Cheng 已提交
89 90 91
  while (pTbData) {
    if (pTbData->uid == uid) break;
    pTbData = pTbData->next;
H
Hongze Cheng 已提交
92
  }
H
Hongze Cheng 已提交
93

H
Hongze Cheng 已提交
94
  return pTbData;
H
Hongze Cheng 已提交
95
}
H
Hongze Cheng 已提交
96 97 98

STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
  STbData *pTbData;
H
Hongze Cheng 已提交
99 100

  taosRLockLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
101
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
H
Hongze Cheng 已提交
102 103
  taosRUnLockLatch(&pMemTable->latch);

H
Hongze Cheng 已提交
104
  return pTbData;
H
Hongze Cheng 已提交
105
}
H
Hongze Cheng 已提交
106

H
Hongze Cheng 已提交
107
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
H
Hongze Cheng 已提交
108 109 110
  int32_t    code = 0;
  SMemTable *pMemTable = pTsdb->mem;
  STbData   *pTbData = NULL;
H
Hongze Cheng 已提交
111 112
  tb_uid_t   suid = pSubmitTbData->suid;
  tb_uid_t   uid = pSubmitTbData->uid;
H
Hongze Cheng 已提交
113

H
Hongze Cheng 已提交
114 115 116 117 118 119 120
  // create/get STbData to op
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
  if (code) {
    goto _err;
  }

  // do insert impl
H
Hongze Cheng 已提交
121 122 123 124
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
  } else {
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
H
Hongze Cheng 已提交
125
  }
H
Hongze Cheng 已提交
126
  if (code) goto _err;
H
Hongze Cheng 已提交
127

H
Hongze Cheng 已提交
128 129 130
  // update
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
H
Hongze Cheng 已提交
131 132 133 134

  return code;

_err:
D
dapan1121 已提交
135
  terrno = code;
H
Hongze Cheng 已提交
136 137 138 139 140 141 142 143 144
  return code;
}

int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
  int32_t    code = 0;
  SMemTable *pMemTable = pTsdb->mem;
  STbData   *pTbData = NULL;
  SVBufPool *pPool = pTsdb->pVnode->inUse;

H
Hongze Cheng 已提交
145 146
  // check if table exists
  SMetaInfo info;
147
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
H
Hongze Cheng 已提交
148 149 150 151 152 153 154 155
  if (code) {
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
    goto _err;
  }
  if (info.suid != suid) {
    code = TSDB_CODE_INVALID_MSG;
    goto _err;
  }
H
Hongze Cheng 已提交
156 157 158 159 160 161

  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
  if (code) {
    goto _err;
  }

162
  ASSERT(pPool != NULL);
H
Hongze Cheng 已提交
163
  // do delete
H
Hongze Cheng 已提交
164 165
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
  if (pDelData == NULL) {
H
Hongze Cheng 已提交
166 167 168
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
169 170 171 172
  pDelData->version = version;
  pDelData->sKey = sKey;
  pDelData->eKey = eKey;
  pDelData->pNext = NULL;
H
Hongze Cheng 已提交
173 174
  if (pTbData->pHead == NULL) {
    ASSERT(pTbData->pTail == NULL);
H
Hongze Cheng 已提交
175
    pTbData->pHead = pTbData->pTail = pDelData;
H
Hongze Cheng 已提交
176
  } else {
H
Hongze Cheng 已提交
177 178
    pTbData->pTail->pNext = pDelData;
    pTbData->pTail = pDelData;
H
Hongze Cheng 已提交
179 180
  }

H
Hongze Cheng 已提交
181
  pMemTable->nDel++;
H
Hongze Cheng 已提交
182 183
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
  pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
184
  /*
185 186 187 188 189 190
  if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
    tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
  }

  if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
    tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
191
  }
192
  */
M
Minglei Jin 已提交
193 194 195
  // if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
  tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
  //}
196

H
Hongze Cheng 已提交
197 198 199
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
            " at version %" PRId64,
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
H
Hongze Cheng 已提交
200 201 202
  return code;

_err:
C
Cary Xu 已提交
203
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
204 205
            " at version %" PRId64 " since %s",
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
H
Hongze Cheng 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  return code;
}

int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) {
  int32_t code = 0;

  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
  if ((*ppIter) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);

_exit:
  return code;
}

void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
  if (pIter) {
    taosMemoryFree(pIter);
  }
  return NULL;
}

H
Hongze Cheng 已提交
231
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) {
H
Hongze Cheng 已提交
232
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
233 234
  SMemSkipListNode *pHead;
  SMemSkipListNode *pTail;
H
Hongze Cheng 已提交
235

H
Hongze Cheng 已提交
236 237
  pHead = pTbData->sl.pHead;
  pTail = pTbData->sl.pTail;
H
Hongze Cheng 已提交
238 239
  pIter->pTbData = pTbData;
  pIter->backward = backward;
H
Hongze Cheng 已提交
240
  pIter->pRow = NULL;
H
Hongze Cheng 已提交
241 242 243
  if (pFrom == NULL) {
    // create from head or tail
    if (backward) {
H
Hongze Cheng 已提交
244
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
H
Hongze Cheng 已提交
245
    } else {
H
Hongze Cheng 已提交
246
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
H
Hongze Cheng 已提交
247 248 249 250 251
    }
  } else {
    // create from a key
    if (backward) {
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
H
Hongze Cheng 已提交
252
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
H
Hongze Cheng 已提交
253 254
    } else {
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
H
Hongze Cheng 已提交
255
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
H
Hongze Cheng 已提交
256 257 258 259 260
    }
  }
}

bool tsdbTbDataIterNext(STbDataIter *pIter) {
H
Hongze Cheng 已提交
261
  pIter->pRow = NULL;
H
Hongze Cheng 已提交
262
  if (pIter->backward) {
H
Hongze Cheng 已提交
263
    ASSERT(pIter->pNode != pIter->pTbData->sl.pTail);
H
Hongze Cheng 已提交
264

H
Hongze Cheng 已提交
265
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
H
Hongze Cheng 已提交
266 267 268
      return false;
    }

H
Hongze Cheng 已提交
269
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
H
Hongze Cheng 已提交
270
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
H
Hongze Cheng 已提交
271 272 273
      return false;
    }
  } else {
H
Hongze Cheng 已提交
274
    ASSERT(pIter->pNode != pIter->pTbData->sl.pHead);
H
Hongze Cheng 已提交
275

H
Hongze Cheng 已提交
276
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
H
Hongze Cheng 已提交
277 278 279
      return false;
    }

H
Hongze Cheng 已提交
280
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
H
Hongze Cheng 已提交
281
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
H
Hongze Cheng 已提交
282 283 284 285 286 287 288
      return false;
    }
  }

  return true;
}

D
dapan1121 已提交
289
int64_t tsdbCountTbDataRows(STbData *pTbData) {
D
dapan1121 已提交
290
  SMemSkipListNode *pNode = pTbData->sl.pHead;
291 292
  int64_t           rowsNum = 0;

D
dapan1121 已提交
293
  while (NULL != pNode) {
D
dapan1121 已提交
294
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
D
dapan1121 已提交
295 296 297 298 299 300
    if (pNode == pTbData->sl.pTail) {
      return rowsNum;
    }

    rowsNum++;
  }
D
dapan1121 已提交
301 302

  return rowsNum;
D
dapan1121 已提交
303 304
}

305
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
D
dapan1121 已提交
306 307 308
  taosRLockLatch(&pMemTable->latch);
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
    STbData *pTbData = pMemTable->aBucket[i];
D
dapan1121 已提交
309
    while (pTbData) {
310
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
D
dapan1121 已提交
311 312 313 314
      if (p == NULL) {
        pTbData = pTbData->next;
        continue;
      }
315

D
dapan1121 已提交
316
      *rowsNum += tsdbCountTbDataRows(pTbData);
D
dapan1121 已提交
317
      pTbData = pTbData->next;
D
dapan1121 已提交
318 319 320 321 322
    }
  }
  taosRUnLockLatch(&pMemTable->latch);
}

H
Hongze Cheng 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
  int32_t code = 0;

  int32_t   nBucket = pMemTable->nBucket * 2;
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
    STbData *pTbData = pMemTable->aBucket[iBucket];

    while (pTbData) {
      STbData *pNext = pTbData->next;

      int32_t idx = TABS(pTbData->uid) % nBucket;
      pTbData->next = aBucket[idx];
      aBucket[idx] = pTbData;

      pTbData = pNext;
    }
  }

  taosMemoryFree(pMemTable->aBucket);
  pMemTable->nBucket = nBucket;
  pMemTable->aBucket = aBucket;

_exit:
  return code;
}

H
Hongze Cheng 已提交
355
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
H
Hongze Cheng 已提交
356
  int32_t code = 0;
H
Hongze Cheng 已提交
357 358

  // get
H
Hongze Cheng 已提交
359 360
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
  if (pTbData) goto _exit;
361

H
Hongze Cheng 已提交
362 363 364 365
  // create
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;

366
  ASSERT(pPool != NULL);
H
Hongze Cheng 已提交
367
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
H
Hongze Cheng 已提交
368 369 370
  if (pTbData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
371
  }
H
Hongze Cheng 已提交
372 373
  pTbData->suid = suid;
  pTbData->uid = uid;
H
Hongze Cheng 已提交
374 375
  pTbData->minKey = TSKEY_MAX;
  pTbData->maxKey = TSKEY_MIN;
H
Hongze Cheng 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
  pTbData->pHead = NULL;
  pTbData->pTail = NULL;
  pTbData->sl.seed = taosRand();
  pTbData->sl.size = 0;
  pTbData->sl.maxLevel = maxLevel;
  pTbData->sl.level = 0;
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
  pTbData->sl.pHead->level = maxLevel;
  pTbData->sl.pTail->level = maxLevel;
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;

    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
  }

H
Hongze Cheng 已提交
394 395 396 397 398 399 400 401
  taosWLockLatch(&pMemTable->latch);

  if (pMemTable->nTbData >= pMemTable->nBucket) {
    code = tsdbMemTableRehash(pMemTable);
    if (code) {
      taosWUnLockLatch(&pMemTable->latch);
      goto _err;
    }
H
Hongze Cheng 已提交
402
  }
H
Hongze Cheng 已提交
403

H
Hongze Cheng 已提交
404 405 406 407
  int32_t idx = TABS(uid) % pMemTable->nBucket;
  pTbData->next = pMemTable->aBucket[idx];
  pMemTable->aBucket[idx] = pTbData;
  pMemTable->nTbData++;
H
Hongze Cheng 已提交
408

H
Hongze Cheng 已提交
409
  taosWUnLockLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
410

H
Hongze Cheng 已提交
411 412 413
_exit:
  *ppTbData = pTbData;
  return code;
H
Hongze Cheng 已提交
414

H
Hongze Cheng 已提交
415 416 417 418 419 420 421 422
_err:
  *ppTbData = NULL;
  return code;
}

static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
  SMemSkipListNode *px;
  SMemSkipListNode *pn;
H
Hongze Cheng 已提交
423
  TSDBKEY           tKey = {0};
H
Hongze Cheng 已提交
424 425
  int32_t           backward = flags & SL_MOVE_BACKWARD;
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
H
Hongze Cheng 已提交
426 427 428 429

  if (backward) {
    px = pTbData->sl.pTail;

H
Hongze Cheng 已提交
430 431 432 433
    if (!fromPos) {
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
        pos[iLevel] = px;
      }
H
Hongze Cheng 已提交
434 435
    }

H
Hongze Cheng 已提交
436 437
    if (pTbData->sl.level) {
      if (fromPos) px = pos[pTbData->sl.level - 1];
H
Hongze Cheng 已提交
438

H
Hongze Cheng 已提交
439
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
H
Hongze Cheng 已提交
440
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
H
Hongze Cheng 已提交
441
        while (pn != pTbData->sl.pHead) {
H
Hongze Cheng 已提交
442 443 444 445 446 447 448
          if (pn->flag == TSDBROW_ROW_FMT) {
            tKey.version = pn->version;
            tKey.ts = ((SRow *)pn->pData)->ts;
          } else if (pn->flag == TSDBROW_COL_FMT) {
            tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
            tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
          }
H
Hongze Cheng 已提交
449

H
Hongze Cheng 已提交
450
          int32_t c = tsdbKeyCmprFn(&tKey, pKey);
H
Hongze Cheng 已提交
451 452 453 454
          if (c <= 0) {
            break;
          } else {
            px = pn;
H
Hongze Cheng 已提交
455
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
H
Hongze Cheng 已提交
456 457 458 459 460 461
          }
        }

        pos[iLevel] = px;
      }
    }
H
Hongze Cheng 已提交
462
  } else {
H
Hongze Cheng 已提交
463 464
    px = pTbData->sl.pHead;

H
Hongze Cheng 已提交
465 466 467 468
    if (!fromPos) {
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
        pos[iLevel] = px;
      }
H
Hongze Cheng 已提交
469 470 471 472 473 474
    }

    if (pTbData->sl.level) {
      if (fromPos) px = pos[pTbData->sl.level - 1];

      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
H
Hongze Cheng 已提交
475
        pn = SL_GET_NODE_FORWARD(px, iLevel);
H
Hongze Cheng 已提交
476
        while (pn != pTbData->sl.pTail) {
H
Hongze Cheng 已提交
477 478 479 480 481 482 483
          if (pn->flag == TSDBROW_ROW_FMT) {
            tKey.version = pn->version;
            tKey.ts = ((SRow *)pn->pData)->ts;
          } else if (pn->flag == TSDBROW_COL_FMT) {
            tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
            tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
          }
H
Hongze Cheng 已提交
484 485

          int32_t c = tsdbKeyCmprFn(&tKey, pKey);
H
Hongze Cheng 已提交
486 487 488 489
          if (c >= 0) {
            break;
          } else {
            px = pn;
H
Hongze Cheng 已提交
490
            pn = SL_GET_NODE_FORWARD(px, iLevel);
H
Hongze Cheng 已提交
491 492 493 494 495 496
          }
        }

        pos[iLevel] = px;
      }
    }
H
Hongze Cheng 已提交
497
  }
H
Hongze Cheng 已提交
498
}
H
Hongze Cheng 已提交
499

H
Hongze Cheng 已提交
500
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
H
Hongze Cheng 已提交
501 502
  int8_t level = 1;
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
H
Hongze Cheng 已提交
503

H
Hongze Cheng 已提交
504
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
H
Hongze Cheng 已提交
505 506
    level++;
  }
H
Hongze Cheng 已提交
507

H
Hongze Cheng 已提交
508 509 510 511 512 513
  return level;
}
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
                           int8_t forward) {
  int32_t           code = 0;
  int8_t            level;
H
Haojun Liao 已提交
514
  SMemSkipListNode *pNode = NULL;
H
Hongze Cheng 已提交
515
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
516
  int64_t           nSize;
H
Hongze Cheng 已提交
517

H
Hongze Cheng 已提交
518
  // create node
H
Hongze Cheng 已提交
519
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
H
Hongze Cheng 已提交
520
  nSize = SL_NODE_SIZE(level);
X
Xiaoyu Wang 已提交
521 522 523 524 525 526 527
  if (pRow->type == TSDBROW_ROW_FMT) {
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
  } else if (pRow->type == TSDBROW_COL_FMT) {
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
  } else {
    ASSERT(0);
  }
H
Hongze Cheng 已提交
528 529 530 531
  if (pNode == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
X
Xiaoyu Wang 已提交
532

H
Hongze Cheng 已提交
533
  pNode->level = level;
H
Hongze Cheng 已提交
534 535 536
  pNode->flag = pRow->type;
  if (pRow->type == TSDBROW_ROW_FMT) {
    pNode->version = pRow->version;
X
Xiaoyu Wang 已提交
537
    pNode->pData = (char *)pNode + nSize;
H
Hongze Cheng 已提交
538 539 540 541
    memcpy(pNode->pData, pRow->pTSRow, pRow->pTSRow->len);
  } else if (pRow->type == TSDBROW_COL_FMT) {
    pNode->iRow = pRow->iRow;
    pNode->pData = pRow->pBlockData;
H
Hongze Cheng 已提交
542 543
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
544
  }
C
Cary Xu 已提交
545

H
Hongze Cheng 已提交
546 547 548 549 550 551 552 553 554 555
  // set node
  if (forward) {
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
    }
  } else {
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
H
Hongze Cheng 已提交
556 557
    }
  }
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559 560 561 562
  // set forward and backward
  if (forward) {
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
C
Cary Xu 已提交
563

H
Hongze Cheng 已提交
564 565
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567
      pos[iLevel] = pNode;
H
Hongze Cheng 已提交
568
    }
H
Hongze Cheng 已提交
569 570 571 572 573 574
  } else {
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];

      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
H
Hongze Cheng 已提交
575

H
Hongze Cheng 已提交
576 577
      pos[iLevel] = pNode;
    }
H
Hongze Cheng 已提交
578 579
  }

H
Hongze Cheng 已提交
580 581 582
  pTbData->sl.size++;
  if (pTbData->sl.level < pNode->level) {
    pTbData->sl.level = pNode->level;
H
Hongze Cheng 已提交
583 584
  }

H
Hongze Cheng 已提交
585 586
_exit:
  return code;
H
Hongze Cheng 已提交
587 588
}

H
Hongze Cheng 已提交
589 590 591
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
  int32_t code = 0;
H
Hongze Cheng 已提交
592

H
Hongze Cheng 已提交
593 594 595
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
H
Hongze Cheng 已提交
596

L
Liu Jicong 已提交
597 598 599
  ASSERT(aColData[0].cid == PRIMARYKEY_TIMESTAMP_COL_ID);
  ASSERT(aColData[0].type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(aColData[0].flag == HAS_VALUE);
H
Hongze Cheng 已提交
600

H
Hongze Cheng 已提交
601 602 603 604 605
  // copy and construct block data
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
  if (pBlockData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
H
Hongze Cheng 已提交
606 607
  }

H
Hongze Cheng 已提交
608 609 610 611 612 613 614 615 616 617 618
  pBlockData->suid = pTbData->suid;
  pBlockData->uid = pTbData->uid;
  pBlockData->nRow = aColData[0].nVal;
  pBlockData->aUid = NULL;
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
  if (pBlockData->aVersion == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
    pBlockData->aVersion[i] = version;
H
Hongze Cheng 已提交
619 620
  }

H
Hongze Cheng 已提交
621 622 623 624 625 626 627 628
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
  if (pBlockData->aTSKEY == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);

  pBlockData->nColData = nColData - 1;
H
Hongze Cheng 已提交
629 630 631
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
  if (pBlockData->aColData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
632
    goto _exit;
H
Hongze Cheng 已提交
633
  }
H
Haojun Liao 已提交
634

H
Hongze Cheng 已提交
635 636 637
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
    if (code) goto _exit;
H
Hongze Cheng 已提交
638 639
  }

H
Hongze Cheng 已提交
640 641
  // loop to add each row to the skiplist
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
642 643 644
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
  TSDBKEY           key = {.version = version, .ts = pBlockData->aTSKEY[0]};
  TSDBROW           lRow;  // last row
H
Hongze Cheng 已提交
645

H
Hongze Cheng 已提交
646
  // first row
H
Hongze Cheng 已提交
647
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
H
Hongze Cheng 已提交
648
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
H
Hongze Cheng 已提交
649
  pTbData->minKey = TMIN(pTbData->minKey, key.ts);
H
Hongze Cheng 已提交
650
  lRow = tRow;
H
Hongze Cheng 已提交
651

H
Hongze Cheng 已提交
652 653
  // remain row
  ++tRow.iRow;
H
Hongze Cheng 已提交
654
  if (tRow.iRow < pBlockData->nRow) {
H
Hongze Cheng 已提交
655 656 657 658
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
    }

H
Hongze Cheng 已提交
659 660 661
    while (tRow.iRow < pBlockData->nRow) {
      key.ts = pBlockData->aTSKEY[tRow.iRow];

H
Hongze Cheng 已提交
662 663 664 665
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
      }

H
Hongze Cheng 已提交
666
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
H
Hongze Cheng 已提交
667
      lRow = tRow;
H
Hongze Cheng 已提交
668

H
Hongze Cheng 已提交
669
      ++tRow.iRow;
H
Hongze Cheng 已提交
670 671
    }
  }
H
Hongze Cheng 已提交
672

H
Hongze Cheng 已提交
673 674 675
  if (key.ts >= pTbData->maxKey) {
    pTbData->maxKey = key.ts;
  }
676 677 678 679

  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
    tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
  }
H
Hongze Cheng 已提交
680 681 682 683 684 685 686 687

  // SMemTable
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
  pMemTable->nRow += pBlockData->nRow;

  if (affectedRows) *affectedRows = pBlockData->nRow;

H
Hongze Cheng 已提交
688 689 690
_exit:
  return code;
}
H
Hongze Cheng 已提交
691

H
Hongze Cheng 已提交
692 693
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
H
Hongze Cheng 已提交
694
  int32_t code = 0;
H
Hongze Cheng 已提交
695

H
Hongze Cheng 已提交
696 697
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
H
Hongze Cheng 已提交
698 699
  TSDBKEY           key = {.version = version};
  SMemSkipListNode *pos[SL_MAX_LEVEL];
H
Hongze Cheng 已提交
700
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
H
Hongze Cheng 已提交
701
  int32_t           iRow = 0;
H
Hongze Cheng 已提交
702
  TSDBROW           lRow;
H
Hongze Cheng 已提交
703

H
Hongze Cheng 已提交
704
  // backward put first data
H
Hongze Cheng 已提交
705 706
  tRow.pTSRow = aRow[iRow++];
  key.ts = tRow.pTSRow->ts;
H
Hongze Cheng 已提交
707
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
H
Hongze Cheng 已提交
708
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
H
Hongze Cheng 已提交
709
  if (code) goto _exit;
H
Hongze Cheng 已提交
710
  lRow = tRow;
H
Hongze Cheng 已提交
711

H
Hongze Cheng 已提交
712
  pTbData->minKey = TMIN(pTbData->minKey, key.ts);
713

H
Hongze Cheng 已提交
714
  // forward put rest data
H
Hongze Cheng 已提交
715
  if (iRow < nRow) {
H
Hongze Cheng 已提交
716
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
H
Hongze Cheng 已提交
717 718
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
    }
H
Hongze Cheng 已提交
719 720

    while (iRow < nRow) {
H
Hongze Cheng 已提交
721
      tRow.pTSRow = aRow[iRow];
H
Hongze Cheng 已提交
722
      key.ts = tRow.pTSRow->ts;
H
Hongze Cheng 已提交
723

H
Hongze Cheng 已提交
724 725 726
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
      }
H
Hongze Cheng 已提交
727

H
Hongze Cheng 已提交
728
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
H
Hongze Cheng 已提交
729
      if (code) goto _exit;
730

H
Hongze Cheng 已提交
731
      lRow = tRow;
732

H
Hongze Cheng 已提交
733 734
      iRow++;
    }
H
Hongze Cheng 已提交
735 736
  }

737
  if (key.ts >= pTbData->maxKey) {
H
Hongze Cheng 已提交
738
    pTbData->maxKey = key.ts;
739
  }
740 741 742
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
    tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
  }
743

H
Hongze Cheng 已提交
744
  // SMemTable
H
Hongze Cheng 已提交
745 746
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
H
Hongze Cheng 已提交
747
  pMemTable->nRow += nRow;
H
Hongze Cheng 已提交
748

H
Hongze Cheng 已提交
749
  if (affectedRows) *affectedRows = nRow;
H
Hongze Cheng 已提交
750

H
Hongze Cheng 已提交
751
_exit:
H
Hongze Cheng 已提交
752
  return code;
H
Hongze Cheng 已提交
753
}
H
Hongze Cheng 已提交
754

755
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
H
Hongze Cheng 已提交
756

H
Hongze Cheng 已提交
757
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
H
Hongze Cheng 已提交
758 759
  int32_t code = 0;

H
Hongze Cheng 已提交
760 761
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
  ASSERT(nRef > 0);
H
Hongze Cheng 已提交
762

H
Hongze Cheng 已提交
763
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
H
Hongze Cheng 已提交
764

H
Hongze Cheng 已提交
765
_exit:
H
Hongze Cheng 已提交
766
  return code;
H
Hongze Cheng 已提交
767 768
}

H
Hongze Cheng 已提交
769
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
H
Hongze Cheng 已提交
770
  int32_t code = 0;
H
Hongze Cheng 已提交
771 772

  if (pNode) {
H
Hongze Cheng 已提交
773
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
H
Hongze Cheng 已提交
774 775
  }

H
Hongze Cheng 已提交
776 777
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
    tsdbMemTableDestroy(pMemTable, proactive);
H
Hongze Cheng 已提交
778
  }
H
Hongze Cheng 已提交
779 780

  return code;
H
Hongze Cheng 已提交
781
}
H
Hongze Cheng 已提交
782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818

static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
  STbData *pTbData1 = *(STbData **)p1;
  STbData *pTbData2 = *(STbData **)p2;

  if (pTbData1->suid < pTbData2->suid) {
    return -1;
  } else if (pTbData1->suid > pTbData2->suid) {
    return 1;
  }

  if (pTbData1->uid < pTbData2->uid) {
    return -1;
  } else if (pTbData1->uid > pTbData2->uid) {
    return 1;
  }

  return 0;
}

SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
  SArray *aTbDataP = taosArrayInit(pMemTable->nTbData, sizeof(STbData *));
  if (aTbDataP == NULL) goto _exit;

  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
    STbData *pTbData = pMemTable->aBucket[iBucket];

    while (pTbData) {
      taosArrayPush(aTbDataP, &pTbData);
      pTbData = pTbData->next;
    }
  }

  taosArraySort(aTbDataP, tbDataPCmprFn);

_exit:
  return aTbDataP;
D
dapan1121 已提交
819
}