tsdbMemTable2.c 11.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
refact  
Hongze Cheng 已提交
18 19 20 21 22
typedef struct SMemTable          SMemTable;
typedef struct SMemData           SMemData;
typedef struct SMemSkipList       SMemSkipList;
typedef struct SMemSkipListNode   SMemSkipListNode;
typedef struct SMemSkipListCurosr SMemSkipListCurosr;
H
Hongze Cheng 已提交
23 24

struct SMemTable {
H
Hongze Cheng 已提交
25 26 27 28 29 30 31 32 33 34
  STsdb              *pTsdb;
  TSKEY               minKey;
  TSKEY               maxKey;
  int64_t             minVer;
  int64_t             maxVer;
  int64_t             nRows;
  int32_t             nHash;
  int32_t             nBucket;
  SMemData          **pBuckets;
  SMemSkipListCurosr *pSlc;
H
Hongze Cheng 已提交
35 36
};

H
Hongze Cheng 已提交
37 38 39
struct SMemSkipListNode {
  int8_t            level;
  SMemSkipListNode *forwards[];
H
Hongze Cheng 已提交
40 41 42
};

struct SMemSkipList {
H
Hongze Cheng 已提交
43
  uint32_t         seed;
H
more  
Hongze Cheng 已提交
44
  int8_t           maxLevel;
H
Hongze Cheng 已提交
45 46 47
  int8_t           level;
  int32_t          size;
  SMemSkipListNode pHead[];
H
Hongze Cheng 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61
};

struct SMemData {
  SMemData    *pHashNext;
  tb_uid_t     suid;
  tb_uid_t     uid;
  TSKEY        minKey;
  TSKEY        maxKey;
  int64_t      minVer;
  int64_t      maxVer;
  int64_t      nRows;
  SMemSkipList sl;
};

H
refact  
Hongze Cheng 已提交
62 63
struct SMemSkipListCurosr {
  SMemSkipList     *pSl;
H
Hongze Cheng 已提交
64
  SMemSkipListNode *pNodeC;
H
refact  
Hongze Cheng 已提交
65 66
};

H
Hongze Cheng 已提交
67 68 69 70 71 72
typedef struct {
  int64_t       version;
  uint32_t      szRow;
  const STSRow *pRow;
} STsdbRow;

H
Hongze Cheng 已提交
73 74
#define SL_MAX_LEVEL 15

H
Hongze Cheng 已提交
75 76
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))

H
refact  
Hongze Cheng 已提交
77 78 79 80 81 82
#define SL_NODE_SIZE(l)        (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l)   (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l)  ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n)        (&SL_NODE_BACKWARD(n, (n)->level))

H
Hongze Cheng 已提交
83 84 85 86
#define SL_HEAD_NODE(sl)            ((sl)->pHead)
#define SL_TAIL_NODE(sl)            ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
#define SL_HEAD_NODE_FORWARD(n, l)  SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
H
refact  
Hongze Cheng 已提交
87

H
Hongze Cheng 已提交
88 89 90 91 92 93 94
static int8_t  tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow);
static int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow);
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc);
static void    tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc);
static void    tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl);
static void    tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode);
H
Hongze Cheng 已提交
95 96 97
static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags);
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc);
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc);
H
Hongze Cheng 已提交
98
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow);
H
refact  
Hongze Cheng 已提交
99

H
Hongze Cheng 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
  SMemTable *pMemTb = NULL;

  pMemTb = taosMemoryCalloc(1, sizeof(*pMemTb));
  if (pMemTb == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pMemTb->pTsdb = pTsdb;
  pMemTb->minKey = TSKEY_MAX;
  pMemTb->maxKey = TSKEY_MIN;
  pMemTb->minVer = -1;
  pMemTb->maxVer = -1;
  pMemTb->nRows = 0;
  pMemTb->nHash = 0;
  pMemTb->nBucket = 1024;
  pMemTb->pBuckets = taosMemoryCalloc(pMemTb->nBucket, sizeof(*pMemTb->pBuckets));
  if (pMemTb->pBuckets == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
121
    taosMemoryFree(pMemTb);
H
Hongze Cheng 已提交
122 123
    return -1;
  }
H
Hongze Cheng 已提交
124 125 126 127 128
  if (tsdbMemSkipListCursorCreate(pTsdb->pVnode->config.tsdbCfg.slLevel, &pMemTb->pSlc) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    taosMemoryFree(pMemTb->pBuckets);
    taosMemoryFree(pMemTb);
  }
H
Hongze Cheng 已提交
129 130 131 132 133

  *ppMemTb = pMemTb;
  return 0;
}

H
Hongze Cheng 已提交
134 135 136
int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
  if (pMemTb) {
    // loop to destroy the contents (todo)
H
Hongze Cheng 已提交
137
    tsdbMemSkipListCursorDestroy(pMemTb->pSlc);
H
Hongze Cheng 已提交
138 139 140 141 142 143 144
    taosMemoryFree(pMemTb->pBuckets);
    taosMemoryFree(pMemTb);
  }
  return 0;
}

int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
H
Hongze Cheng 已提交
145 146 147 148 149 150 151 152 153 154 155 156
  SMemData  *pMemData;
  STsdb     *pTsdb = pMemTb->pTsdb;
  SVnode    *pVnode = pTsdb->pVnode;
  SVBufPool *pPool = pVnode->inUse;
  tb_uid_t   suid = pSubmitBlk->suid;
  tb_uid_t   uid = pSubmitBlk->uid;
  int32_t    iBucket;

  // search SMemData by hash
  iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
  for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
    if (pMemData->suid == suid && pMemData->uid == uid) break;
H
Hongze Cheng 已提交
157 158 159 160
  }

  // create pMemData if need
  if (pMemData == NULL) {
H
refact  
Hongze Cheng 已提交
161 162 163 164 165
    int8_t            maxLevel = pVnode->config.tsdbCfg.slLevel;
    int32_t           tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2;
    SMemSkipListNode *pHead, *pTail;

    pMemData = vnodeBufPoolMalloc(pPool, tsize);
H
Hongze Cheng 已提交
166 167 168 169 170 171
    if (pMemData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    pMemData->pHashNext = NULL;
H
Hongze Cheng 已提交
172 173
    pMemData->suid = suid;
    pMemData->uid = uid;
H
Hongze Cheng 已提交
174 175 176 177 178 179
    pMemData->minKey = TSKEY_MAX;
    pMemData->maxKey = TSKEY_MIN;
    pMemData->minVer = -1;
    pMemData->maxVer = -1;
    pMemData->nRows = 0;
    pMemData->sl.seed = taosRand();
H
refact  
Hongze Cheng 已提交
180
    pMemData->sl.maxLevel = maxLevel;
H
more  
Hongze Cheng 已提交
181
    pMemData->sl.level = 0;
H
Hongze Cheng 已提交
182
    pMemData->sl.size = 0;
H
refact  
Hongze Cheng 已提交
183 184 185 186 187
    pHead = SL_HEAD_NODE(&pMemData->sl);
    pTail = SL_TAIL_NODE(&pMemData->sl);
    pHead->level = maxLevel;
    pTail->level = maxLevel;
    for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
H
Hongze Cheng 已提交
188 189
      SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
      SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
H
refact  
Hongze Cheng 已提交
190
    }
H
Hongze Cheng 已提交
191

H
Hongze Cheng 已提交
192 193 194 195 196 197 198
    // add to hash
    if (pMemTb->nHash >= pMemTb->nBucket) {
      // rehash (todo)
    }
    iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
    pMemData->pHashNext = pMemTb->pBuckets[iBucket];
    pMemTb->pBuckets[iBucket] = pMemData;
H
refact  
Hongze Cheng 已提交
199
    pMemTb->nHash++;
H
Hongze Cheng 已提交
200 201

    // sort organize (todo)
H
Hongze Cheng 已提交
202 203
  }

H
Hongze Cheng 已提交
204
  // do insert data to SMemData
H
Hongze Cheng 已提交
205 206
  SMemSkipListNode *forwards[SL_MAX_LEVEL];
  SMemSkipListNode *pNode;
H
Hongze Cheng 已提交
207
  int32_t           iRow;
H
Hongze Cheng 已提交
208 209 210
  STsdbRow          tRow = {.version = version};
  SEncoder          ec = {0};
  SDecoder          dc = {0};
H
Hongze Cheng 已提交
211

H
Hongze Cheng 已提交
212 213
  tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData);
  tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl);
H
Hongze Cheng 已提交
214
  for (iRow = 0;; iRow++) {
H
Hongze Cheng 已提交
215
    if (tDecodeIsEnd(&dc)) break;
H
refact  
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217
    // decode row
H
Hongze Cheng 已提交
218
    if (tDecodeBinary(&dc, (const uint8_t **)&tRow.pRow, &tRow.szRow) < 0) {
H
refact  
Hongze Cheng 已提交
219 220 221
      terrno = TSDB_CODE_INVALID_MSG;
      return -1;
    }
H
more  
Hongze Cheng 已提交
222

H
Hongze Cheng 已提交
223
    // move cursor
H
Hongze Cheng 已提交
224
    tsdbMemSkipListCursorMoveTo(pMemTb->pSlc, version, tRow.pRow->ts, 0);
H
refact  
Hongze Cheng 已提交
225

H
refact  
Hongze Cheng 已提交
226
    // encode row
H
Hongze Cheng 已提交
227
    pNode = tsdbMemSkipListNodeCreate(pPool, &pMemData->sl, &tRow);
H
Hongze Cheng 已提交
228 229 230 231
    if (pNode == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
H
refact  
Hongze Cheng 已提交
232

H
Hongze Cheng 已提交
233 234
    // put the node
    tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode);
H
refact  
Hongze Cheng 已提交
235

H
Hongze Cheng 已提交
236
    // update status
H
Hongze Cheng 已提交
237 238
    if (tRow.pRow->ts < pMemData->minKey) pMemData->minKey = tRow.pRow->ts;
    if (tRow.pRow->ts > pMemData->maxKey) pMemData->maxKey = tRow.pRow->ts;
H
Hongze Cheng 已提交
239
  }
H
Hongze Cheng 已提交
240
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
241

H
Hongze Cheng 已提交
242
  // update status
H
refact  
Hongze Cheng 已提交
243 244
  if (pMemData->minVer == -1) pMemData->minVer = version;
  if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
H
Hongze Cheng 已提交
245

H
refact  
Hongze Cheng 已提交
246 247 248 249
  if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey;
  if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey;
  if (pMemTb->minVer == -1) pMemTb->minVer = version;
  if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
H
Hongze Cheng 已提交
250

H
refact  
Hongze Cheng 已提交
251
  return 0;
H
refact  
Hongze Cheng 已提交
252 253
}

H
Hongze Cheng 已提交
254
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
H
refact  
Hongze Cheng 已提交
255
  int8_t         level = 1;
H
Hongze Cheng 已提交
256
  int8_t         tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
H
refact  
Hongze Cheng 已提交
257 258
  const uint32_t factor = 4;

H
Hongze Cheng 已提交
259 260
  while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
    level++;
H
refact  
Hongze Cheng 已提交
261 262 263
  }

  return level;
H
Hongze Cheng 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
}

static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) {
  if (tEncodeI64(pEncoder, pRow->version) < 0) return -1;
  if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1;
  return 0;
}

static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) {
  if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1;
  if (tDecodeBinary(pDecoder, (const uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1;
  return 0;
}

static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc) {
  *ppSlc = (SMemSkipListCurosr *)taosMemoryCalloc(1, sizeof(**ppSlc) + sizeof(SMemSkipListNode *) * maxLevel);
  if (*ppSlc == NULL) {
    return -1;
  }
  return 0;
}

static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc) { taosMemoryFree(pSlc); }

static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) {
  SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
  pSlc->pSl = pSl;
H
Hongze Cheng 已提交
291 292 293
  // for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
  //   pSlc->forwards[iLevel] = pHead;
  // }
H
Hongze Cheng 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
}

static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode) {
  SMemSkipList     *pSl = pSlc->pSl;
  SMemSkipListNode *pNodeNext;

  for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
    // todo

    ASSERT(0);
  }

  if (pSl->level < pNode->level) {
    pSl->level = pNode->level;
  }

  pSl->size += 1;
H
Hongze Cheng 已提交
311 312 313 314 315 316 317 318 319
}

static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags) {
  SMemSkipListNode **pForwards = NULL;
  SMemSkipList      *pSl = pSlc->pSl;
  int8_t             maxLevel = pSl->maxLevel;
  SMemSkipListNode  *pHead = SL_HEAD_NODE(pSl);
  SMemSkipListNode  *pTail = SL_TAIL_NODE(pSl);

H
Hongze Cheng 已提交
320 321 322
  if (pSl->size == 0) {
    for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
      pForwards[iLevel] = pHead;
H
Hongze Cheng 已提交
323 324
    }
  }
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326 327 328
  return 0;
}

H
refact  
Hongze Cheng 已提交
329 330 331 332 333 334 335 336 337 338
static int32_t tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc) {
  // TODO
  return 0;
}

static int32_t tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc) {
  // TODO
  return 0;
}

H
Hongze Cheng 已提交
339 340 341 342 343 344 345 346
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc) {
  // TODO
  return 0;
}

static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc) {
  // TODO
  return 0;
H
Hongze Cheng 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
}

static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow) {
  int32_t           tsize;
  int32_t           ret;
  int8_t            level = tsdbMemSkipListRandLevel(pSl);
  SMemSkipListNode *pNode = NULL;
  SEncoder          ec = {0};

  tEncodeSize(tsdbEncodeRow, pTRow, tsize, ret);
  pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level));
  if (pNode) {
    pNode->level = level;
    tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize);
    tsdbEncodeRow(&ec, pTRow);
    tEncoderClear(&ec);
  }

  return pNode;
H
Hongze Cheng 已提交
366
}