tsdbMemTable2.c 11.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
refact  
Hongze Cheng 已提交
18 19 20 21 22
typedef struct SMemTable          SMemTable;
typedef struct SMemData           SMemData;
typedef struct SMemSkipList       SMemSkipList;
typedef struct SMemSkipListNode   SMemSkipListNode;
typedef struct SMemSkipListCurosr SMemSkipListCurosr;
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25
#define SL_MAX_LEVEL 5

H
Hongze Cheng 已提交
26
struct SMemTable {
H
Hongze Cheng 已提交
27 28 29 30 31 32 33 34 35 36
  STsdb              *pTsdb;
  TSKEY               minKey;
  TSKEY               maxKey;
  int64_t             minVer;
  int64_t             maxVer;
  int64_t             nRows;
  int32_t             nHash;
  int32_t             nBucket;
  SMemData          **pBuckets;
  SMemSkipListCurosr *pSlc;
H
Hongze Cheng 已提交
37 38
};

H
Hongze Cheng 已提交
39 40 41
struct SMemSkipListNode {
  int8_t            level;
  SMemSkipListNode *forwards[];
H
Hongze Cheng 已提交
42 43 44
};

struct SMemSkipList {
H
Hongze Cheng 已提交
45
  uint32_t         seed;
H
more  
Hongze Cheng 已提交
46
  int8_t           maxLevel;
H
Hongze Cheng 已提交
47 48 49
  int8_t           level;
  int32_t          size;
  SMemSkipListNode pHead[];
H
Hongze Cheng 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63
};

struct SMemData {
  SMemData    *pHashNext;
  tb_uid_t     suid;
  tb_uid_t     uid;
  TSKEY        minKey;
  TSKEY        maxKey;
  int64_t      minVer;
  int64_t      maxVer;
  int64_t      nRows;
  SMemSkipList sl;
};

H
refact  
Hongze Cheng 已提交
64 65
struct SMemSkipListCurosr {
  SMemSkipList     *pSl;
H
Hongze Cheng 已提交
66
  SMemSkipListNode *pNodes[SL_MAX_LEVEL];
H
refact  
Hongze Cheng 已提交
67 68
};

H
Hongze Cheng 已提交
69 70 71 72 73 74
typedef struct {
  int64_t       version;
  uint32_t      szRow;
  const STSRow *pRow;
} STsdbRow;

H
Hongze Cheng 已提交
75 76
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))

H
refact  
Hongze Cheng 已提交
77 78 79 80 81 82
#define SL_NODE_SIZE(l)        (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l)   (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l)  ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n)        (&SL_NODE_BACKWARD(n, (n)->level))

H
Hongze Cheng 已提交
83 84 85 86
#define SL_HEAD_NODE(sl)            ((sl)->pHead)
#define SL_TAIL_NODE(sl)            ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
#define SL_HEAD_NODE_FORWARD(n, l)  SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
H
refact  
Hongze Cheng 已提交
87

H
Hongze Cheng 已提交
88 89 90 91 92 93 94
static int8_t  tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow);
static int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow);
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc);
static void    tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc);
static void    tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl);
static void    tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode);
H
Hongze Cheng 已提交
95
static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags);
H
Hongze Cheng 已提交
96 97
static void    tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc);
static void    tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc);
H
Hongze Cheng 已提交
98 99
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc);
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc);
H
Hongze Cheng 已提交
100
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow);
H
refact  
Hongze Cheng 已提交
101

H
Hongze Cheng 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
  SMemTable *pMemTb = NULL;

  pMemTb = taosMemoryCalloc(1, sizeof(*pMemTb));
  if (pMemTb == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pMemTb->pTsdb = pTsdb;
  pMemTb->minKey = TSKEY_MAX;
  pMemTb->maxKey = TSKEY_MIN;
  pMemTb->minVer = -1;
  pMemTb->maxVer = -1;
  pMemTb->nRows = 0;
  pMemTb->nHash = 0;
  pMemTb->nBucket = 1024;
  pMemTb->pBuckets = taosMemoryCalloc(pMemTb->nBucket, sizeof(*pMemTb->pBuckets));
  if (pMemTb->pBuckets == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
123
    taosMemoryFree(pMemTb);
H
Hongze Cheng 已提交
124 125
    return -1;
  }
H
Hongze Cheng 已提交
126 127 128 129 130
  if (tsdbMemSkipListCursorCreate(pTsdb->pVnode->config.tsdbCfg.slLevel, &pMemTb->pSlc) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    taosMemoryFree(pMemTb->pBuckets);
    taosMemoryFree(pMemTb);
  }
H
Hongze Cheng 已提交
131 132 133 134 135

  *ppMemTb = pMemTb;
  return 0;
}

H
Hongze Cheng 已提交
136 137 138
int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
  if (pMemTb) {
    // loop to destroy the contents (todo)
H
Hongze Cheng 已提交
139
    tsdbMemSkipListCursorDestroy(pMemTb->pSlc);
H
Hongze Cheng 已提交
140 141 142 143 144 145 146
    taosMemoryFree(pMemTb->pBuckets);
    taosMemoryFree(pMemTb);
  }
  return 0;
}

int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
H
Hongze Cheng 已提交
147 148 149 150 151 152 153 154 155 156 157 158
  SMemData  *pMemData;
  STsdb     *pTsdb = pMemTb->pTsdb;
  SVnode    *pVnode = pTsdb->pVnode;
  SVBufPool *pPool = pVnode->inUse;
  tb_uid_t   suid = pSubmitBlk->suid;
  tb_uid_t   uid = pSubmitBlk->uid;
  int32_t    iBucket;

  // search SMemData by hash
  iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
  for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
    if (pMemData->suid == suid && pMemData->uid == uid) break;
H
Hongze Cheng 已提交
159 160 161 162
  }

  // create pMemData if need
  if (pMemData == NULL) {
H
refact  
Hongze Cheng 已提交
163 164 165 166 167
    int8_t            maxLevel = pVnode->config.tsdbCfg.slLevel;
    int32_t           tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2;
    SMemSkipListNode *pHead, *pTail;

    pMemData = vnodeBufPoolMalloc(pPool, tsize);
H
Hongze Cheng 已提交
168 169 170 171 172 173
    if (pMemData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    pMemData->pHashNext = NULL;
H
Hongze Cheng 已提交
174 175
    pMemData->suid = suid;
    pMemData->uid = uid;
H
Hongze Cheng 已提交
176 177 178 179 180 181
    pMemData->minKey = TSKEY_MAX;
    pMemData->maxKey = TSKEY_MIN;
    pMemData->minVer = -1;
    pMemData->maxVer = -1;
    pMemData->nRows = 0;
    pMemData->sl.seed = taosRand();
H
refact  
Hongze Cheng 已提交
182
    pMemData->sl.maxLevel = maxLevel;
H
more  
Hongze Cheng 已提交
183
    pMemData->sl.level = 0;
H
Hongze Cheng 已提交
184
    pMemData->sl.size = 0;
H
refact  
Hongze Cheng 已提交
185 186 187 188 189
    pHead = SL_HEAD_NODE(&pMemData->sl);
    pTail = SL_TAIL_NODE(&pMemData->sl);
    pHead->level = maxLevel;
    pTail->level = maxLevel;
    for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
H
Hongze Cheng 已提交
190 191
      SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
      SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
H
refact  
Hongze Cheng 已提交
192
    }
H
Hongze Cheng 已提交
193

H
Hongze Cheng 已提交
194 195 196 197 198 199 200
    // add to hash
    if (pMemTb->nHash >= pMemTb->nBucket) {
      // rehash (todo)
    }
    iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
    pMemData->pHashNext = pMemTb->pBuckets[iBucket];
    pMemTb->pBuckets[iBucket] = pMemData;
H
refact  
Hongze Cheng 已提交
201
    pMemTb->nHash++;
H
Hongze Cheng 已提交
202 203

    // sort organize (todo)
H
Hongze Cheng 已提交
204 205
  }

H
Hongze Cheng 已提交
206
  // do insert data to SMemData
H
Hongze Cheng 已提交
207 208
  SMemSkipListNode *forwards[SL_MAX_LEVEL];
  SMemSkipListNode *pNode;
H
Hongze Cheng 已提交
209
  int32_t           iRow;
H
Hongze Cheng 已提交
210 211 212
  STsdbRow          tRow = {.version = version};
  SEncoder          ec = {0};
  SDecoder          dc = {0};
H
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214 215
  tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData);
  tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl);
H
Hongze Cheng 已提交
216
  for (iRow = 0;; iRow++) {
H
Hongze Cheng 已提交
217
    if (tDecodeIsEnd(&dc)) break;
H
refact  
Hongze Cheng 已提交
218

H
Hongze Cheng 已提交
219
    // decode row
H
Hongze Cheng 已提交
220
    if (tDecodeBinary(&dc, (const uint8_t **)&tRow.pRow, &tRow.szRow) < 0) {
H
refact  
Hongze Cheng 已提交
221 222 223
      terrno = TSDB_CODE_INVALID_MSG;
      return -1;
    }
H
more  
Hongze Cheng 已提交
224

H
Hongze Cheng 已提交
225
    // move cursor
H
Hongze Cheng 已提交
226
    tsdbMemSkipListCursorMoveTo(pMemTb->pSlc, version, tRow.pRow->ts, 0);
H
refact  
Hongze Cheng 已提交
227

H
refact  
Hongze Cheng 已提交
228
    // encode row
H
Hongze Cheng 已提交
229
    pNode = tsdbMemSkipListNodeCreate(pPool, &pMemData->sl, &tRow);
H
Hongze Cheng 已提交
230 231 232 233
    if (pNode == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
H
refact  
Hongze Cheng 已提交
234

H
Hongze Cheng 已提交
235 236
    // put the node
    tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode);
H
refact  
Hongze Cheng 已提交
237

H
Hongze Cheng 已提交
238
    // update status
H
Hongze Cheng 已提交
239 240
    if (tRow.pRow->ts < pMemData->minKey) pMemData->minKey = tRow.pRow->ts;
    if (tRow.pRow->ts > pMemData->maxKey) pMemData->maxKey = tRow.pRow->ts;
H
Hongze Cheng 已提交
241
  }
H
Hongze Cheng 已提交
242
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244
  // update status
H
refact  
Hongze Cheng 已提交
245 246
  if (pMemData->minVer == -1) pMemData->minVer = version;
  if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
H
Hongze Cheng 已提交
247

H
refact  
Hongze Cheng 已提交
248 249 250 251
  if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey;
  if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey;
  if (pMemTb->minVer == -1) pMemTb->minVer = version;
  if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
H
Hongze Cheng 已提交
252

H
refact  
Hongze Cheng 已提交
253
  return 0;
H
refact  
Hongze Cheng 已提交
254 255
}

H
Hongze Cheng 已提交
256
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
H
refact  
Hongze Cheng 已提交
257
  int8_t         level = 1;
H
Hongze Cheng 已提交
258
  int8_t         tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
H
refact  
Hongze Cheng 已提交
259 260
  const uint32_t factor = 4;

H
Hongze Cheng 已提交
261 262
  while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
    level++;
H
refact  
Hongze Cheng 已提交
263 264 265
  }

  return level;
H
Hongze Cheng 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
}

static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) {
  if (tEncodeI64(pEncoder, pRow->version) < 0) return -1;
  if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1;
  return 0;
}

static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) {
  if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1;
  if (tDecodeBinary(pDecoder, (const uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1;
  return 0;
}

static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc) {
  *ppSlc = (SMemSkipListCurosr *)taosMemoryCalloc(1, sizeof(**ppSlc) + sizeof(SMemSkipListNode *) * maxLevel);
  if (*ppSlc == NULL) {
    return -1;
  }
  return 0;
}

static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc) { taosMemoryFree(pSlc); }

static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) {
  SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
  pSlc->pSl = pSl;
H
Hongze Cheng 已提交
293 294 295
  // for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
  //   pSlc->forwards[iLevel] = pHead;
  // }
H
Hongze Cheng 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
}

static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode) {
  SMemSkipList     *pSl = pSlc->pSl;
  SMemSkipListNode *pNodeNext;

  for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
    // todo

    ASSERT(0);
  }

  if (pSl->level < pNode->level) {
    pSl->level = pNode->level;
  }

  pSl->size += 1;
H
Hongze Cheng 已提交
313 314 315 316 317 318 319 320 321
}

static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags) {
  SMemSkipListNode **pForwards = NULL;
  SMemSkipList      *pSl = pSlc->pSl;
  int8_t             maxLevel = pSl->maxLevel;
  SMemSkipListNode  *pHead = SL_HEAD_NODE(pSl);
  SMemSkipListNode  *pTail = SL_TAIL_NODE(pSl);

H
Hongze Cheng 已提交
322 323 324
  if (pSl->size == 0) {
    for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
      pForwards[iLevel] = pHead;
H
Hongze Cheng 已提交
325 326
    }
  }
H
Hongze Cheng 已提交
327

H
Hongze Cheng 已提交
328 329 330
  return 0;
}

H
Hongze Cheng 已提交
331 332 333 334 335 336 337 338 339
static void tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc) {
  SMemSkipList     *pSl = pSlc->pSl;
  SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);

  for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
    pSlc->pNodes[iLevel] = pHead;
  }

  tsdbMemSkipListCursorMoveToNext(pSlc);
H
refact  
Hongze Cheng 已提交
340 341
}

H
Hongze Cheng 已提交
342 343 344 345 346 347 348 349 350
static void tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc) {
  SMemSkipList     *pSl = pSlc->pSl;
  SMemSkipListNode *pTail = SL_TAIL_NODE(pSl);

  for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
    pSlc->pNodes[iLevel] = pTail;
  }

  tsdbMemSkipListCursorMoveToPrev(pSlc);
H
refact  
Hongze Cheng 已提交
351 352
}

H
Hongze Cheng 已提交
353 354 355 356 357 358 359 360
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc) {
  // TODO
  return 0;
}

static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc) {
  // TODO
  return 0;
H
Hongze Cheng 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
}

static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow) {
  int32_t           tsize;
  int32_t           ret;
  int8_t            level = tsdbMemSkipListRandLevel(pSl);
  SMemSkipListNode *pNode = NULL;
  SEncoder          ec = {0};

  tEncodeSize(tsdbEncodeRow, pTRow, tsize, ret);
  pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level));
  if (pNode) {
    pNode->level = level;
    tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize);
    tsdbEncodeRow(&ec, pTRow);
    tEncoderClear(&ec);
  }

  return pNode;
H
Hongze Cheng 已提交
380
}