tsdbMemTable2.c 6.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
refact  
Hongze Cheng 已提交
18 19 20 21 22
typedef struct SMemTable          SMemTable;
typedef struct SMemData           SMemData;
typedef struct SMemSkipList       SMemSkipList;
typedef struct SMemSkipListNode   SMemSkipListNode;
typedef struct SMemSkipListCurosr SMemSkipListCurosr;
H
Hongze Cheng 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35

struct SMemTable {
  STsdb     *pTsdb;
  TSKEY      minKey;
  TSKEY      maxKey;
  int64_t    minVer;
  int64_t    maxVer;
  int64_t    nRows;
  int32_t    nHash;
  int32_t    nBucket;
  SMemData **pBuckets;
};

H
Hongze Cheng 已提交
36 37 38
struct SMemSkipListNode {
  int8_t            level;
  SMemSkipListNode *forwards[];
H
Hongze Cheng 已提交
39 40 41
};

struct SMemSkipList {
H
Hongze Cheng 已提交
42
  uint32_t         seed;
H
more  
Hongze Cheng 已提交
43
  int8_t           maxLevel;
H
Hongze Cheng 已提交
44 45 46
  int8_t           level;
  int32_t          size;
  SMemSkipListNode pHead[];
H
Hongze Cheng 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60
};

struct SMemData {
  SMemData    *pHashNext;
  tb_uid_t     suid;
  tb_uid_t     uid;
  TSKEY        minKey;
  TSKEY        maxKey;
  int64_t      minVer;
  int64_t      maxVer;
  int64_t      nRows;
  SMemSkipList sl;
};

H
refact  
Hongze Cheng 已提交
61 62 63 64 65
struct SMemSkipListCurosr {
  SMemSkipList     *pSl;
  SMemSkipListNode *pNodeC;
};

H
Hongze Cheng 已提交
66 67
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))

H
refact  
Hongze Cheng 已提交
68 69 70 71 72 73
#define SL_NODE_SIZE(l)        (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l)   (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l)  ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n)        (&SL_NODE_BACKWARD(n, (n)->level))

H
Hongze Cheng 已提交
74 75 76 77
#define SL_HEAD_NODE(sl)            ((sl)->pHead)
#define SL_TAIL_NODE(sl)            ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
#define SL_HEAD_NODE_FORWARD(n, l)  SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
H
refact  
Hongze Cheng 已提交
78

H
Hongze Cheng 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
  SMemTable *pMemTb = NULL;

  pMemTb = taosMemoryCalloc(1, sizeof(*pMemTb));
  if (pMemTb == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pMemTb->pTsdb = pTsdb;
  pMemTb->minKey = TSKEY_MAX;
  pMemTb->maxKey = TSKEY_MIN;
  pMemTb->minVer = -1;
  pMemTb->maxVer = -1;
  pMemTb->nRows = 0;
  pMemTb->nHash = 0;
  pMemTb->nBucket = 1024;
  pMemTb->pBuckets = taosMemoryCalloc(pMemTb->nBucket, sizeof(*pMemTb->pBuckets));
  if (pMemTb->pBuckets == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
100
    taosMemoryFree(pMemTb);
H
Hongze Cheng 已提交
101 102 103 104 105 106 107
    return -1;
  }

  *ppMemTb = pMemTb;
  return 0;
}

H
Hongze Cheng 已提交
108 109 110 111 112 113 114 115 116 117
int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
  if (pMemTb) {
    // loop to destroy the contents (todo)
    taosMemoryFree(pMemTb->pBuckets);
    taosMemoryFree(pMemTb);
  }
  return 0;
}

int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
H
Hongze Cheng 已提交
118 119 120 121 122 123 124 125 126 127 128 129
  SMemData  *pMemData;
  STsdb     *pTsdb = pMemTb->pTsdb;
  SVnode    *pVnode = pTsdb->pVnode;
  SVBufPool *pPool = pVnode->inUse;
  tb_uid_t   suid = pSubmitBlk->suid;
  tb_uid_t   uid = pSubmitBlk->uid;
  int32_t    iBucket;

  // search SMemData by hash
  iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
  for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
    if (pMemData->suid == suid && pMemData->uid == uid) break;
H
Hongze Cheng 已提交
130 131 132 133
  }

  // create pMemData if need
  if (pMemData == NULL) {
H
refact  
Hongze Cheng 已提交
134 135 136 137 138
    int8_t            maxLevel = pVnode->config.tsdbCfg.slLevel;
    int32_t           tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2;
    SMemSkipListNode *pHead, *pTail;

    pMemData = vnodeBufPoolMalloc(pPool, tsize);
H
Hongze Cheng 已提交
139 140 141 142 143 144
    if (pMemData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    pMemData->pHashNext = NULL;
H
Hongze Cheng 已提交
145 146
    pMemData->suid = suid;
    pMemData->uid = uid;
H
Hongze Cheng 已提交
147 148 149 150 151 152
    pMemData->minKey = TSKEY_MAX;
    pMemData->maxKey = TSKEY_MIN;
    pMemData->minVer = -1;
    pMemData->maxVer = -1;
    pMemData->nRows = 0;
    pMemData->sl.seed = taosRand();
H
refact  
Hongze Cheng 已提交
153
    pMemData->sl.maxLevel = maxLevel;
H
more  
Hongze Cheng 已提交
154
    pMemData->sl.level = 0;
H
Hongze Cheng 已提交
155
    pMemData->sl.size = 0;
H
refact  
Hongze Cheng 已提交
156 157 158 159 160
    pHead = SL_HEAD_NODE(&pMemData->sl);
    pTail = SL_TAIL_NODE(&pMemData->sl);
    pHead->level = maxLevel;
    pTail->level = maxLevel;
    for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
H
Hongze Cheng 已提交
161 162
      SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
      SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
H
refact  
Hongze Cheng 已提交
163
    }
H
Hongze Cheng 已提交
164

H
Hongze Cheng 已提交
165 166 167 168 169 170 171
    // add to hash
    if (pMemTb->nHash >= pMemTb->nBucket) {
      // rehash (todo)
    }
    iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
    pMemData->pHashNext = pMemTb->pBuckets[iBucket];
    pMemTb->pBuckets[iBucket] = pMemData;
H
refact  
Hongze Cheng 已提交
172
    pMemTb->nHash++;
H
Hongze Cheng 已提交
173 174

    // sort organize (todo)
H
Hongze Cheng 已提交
175 176
  }

H
Hongze Cheng 已提交
177 178 179
  // do insert data to SMemData
  SMemSkipListCurosr slc = {0};
  const uint8_t     *p = pSubmitBlk->pData;
H
Hongze Cheng 已提交
180 181 182
  const uint8_t     *pt;
  const STSRow      *pRow;
  uint64_t           szRow;
H
Hongze Cheng 已提交
183
  SDecoder           decoder = {0};
H
Hongze Cheng 已提交
184

H
Hongze Cheng 已提交
185 186 187
  // tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER);
  for (;;) {
    // if (tDecodeIsEnd(&coder)) break;
H
more  
Hongze Cheng 已提交
188

H
Hongze Cheng 已提交
189 190 191 192 193
    // if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) {
    //   terrno = TSDB_CODE_INVALID_MSG;
    //   return -1;
    // }
    // check the row (todo)
H
Hongze Cheng 已提交
194

H
Hongze Cheng 已提交
195 196 197 198
    //   // move the cursor to position to write (todo)
    //   int32_t c;
    //   tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
    //   ASSERT(c);
H
refact  
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200 201 202 203 204
    //   // encode row
    //   int8_t  level = tsdbMemSkipListRandLevel(&pMemData->sl);
    //   int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
    //   pSlNode = vnodeBufPoolMalloc(pPool, tsize);
    //   pSlNode->level = level;
H
refact  
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206 207 208 209
    //   uint8_t *pData = SL_NODE_DATA(pSlNode);
    //   *(int64_t *)pData = version;
    //   pData += sizeof(version);
    //   memcpy(pData, pt, p - pt);
H
refact  
Hongze Cheng 已提交
210

H
Hongze Cheng 已提交
211 212
    //   // insert row
    //   tsdbMemSkipListCursorPut(&slc, pSlNode);
H
refact  
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214 215 216
    // update status
    if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
    if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
H
Hongze Cheng 已提交
217
  }
H
Hongze Cheng 已提交
218
  // tCoderClear(&coder);
H
Hongze Cheng 已提交
219
  // tsdbMemSkipListCursorClose(&slc);
H
Hongze Cheng 已提交
220

H
Hongze Cheng 已提交
221
  // update status
H
refact  
Hongze Cheng 已提交
222 223
  if (pMemData->minVer == -1) pMemData->minVer = version;
  if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
H
Hongze Cheng 已提交
224

H
refact  
Hongze Cheng 已提交
225 226 227 228
  if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey;
  if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey;
  if (pMemTb->minVer == -1) pMemTb->minVer = version;
  if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
H
Hongze Cheng 已提交
229

H
refact  
Hongze Cheng 已提交
230
  return 0;
H
Hongze Cheng 已提交
231
}