tsdbMemTable.c 9.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
static STbData *tsdbNewTbData(tb_uid_t uid);
static void     tsdbFreeTbData(STbData *pTbData);
H
refact  
Hongze Cheng 已提交
20
static char    *tsdbGetTsTupleKey(const void *data);
H
Hongze Cheng 已提交
21
static int      tsdbTbDataComp(const void *arg1, const void *arg2);
H
refact  
Hongze Cheng 已提交
22
static char    *tsdbTbDataGetUid(const void *arg);
C
Cary Xu 已提交
23
static int      tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row);
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25 26 27 28 29 30 31 32 33
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
  STsdbMemTable *pMemTable;
  SVnode        *pVnode;

  *ppMemTable = NULL;
  pVnode = pTsdb->pVnode;

  // alloc handle
  pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
34
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
35
    return -1;
H
Hongze Cheng 已提交
36 37
  }

H
Hongze Cheng 已提交
38
  pMemTable->pPool = pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
39
  T_REF_INIT_VAL(pMemTable, 1);
H
Hongze Cheng 已提交
40
  taosInitRWLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
41
  pMemTable->keyMin = TSKEY_MAX;
H
Hongze Cheng 已提交
42
  pMemTable->keyMax = TSKEY_MIN;
H
Hongze Cheng 已提交
43
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
44 45
  pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t),
                                      tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
H
Hongze Cheng 已提交
46
  if (pMemTable->pSlIdx == NULL) {
wafwerar's avatar
wafwerar 已提交
47
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
48
    return -1;
H
Hongze Cheng 已提交
49 50 51 52
  }

  pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pMemTable->pHashIdx == NULL) {
H
Hongze Cheng 已提交
53
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
54
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
55
    return -1;
H
Hongze Cheng 已提交
56
  }
H
Hongze Cheng 已提交
57

H
Hongze Cheng 已提交
58
  *ppMemTable = pMemTable;
H
Hongze Cheng 已提交
59
  return 0;
H
Hongze Cheng 已提交
60 61
}

H
Hongze Cheng 已提交
62
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
H
Hongze Cheng 已提交
63 64
  if (pMemTable) {
    taosHashCleanup(pMemTable->pHashIdx);
H
Hongze Cheng 已提交
65
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
66
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
67 68
  }
}
H
Hongze Cheng 已提交
69

H
Hongze Cheng 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
/**
 * This is an important function to load data or try to load data from memory skiplist iterator.
 *
 * This function load memory data until:
 * 1. iterator ends
 * 2. data key exceeds maxKey
 * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
 * 4. operations in pCols not exceeds its max capacity if pCols is given
 *
 * The function tries to procceed AS MUCH AS POSSIBLE.
 */
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
                          TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
H
refact  
Hongze Cheng 已提交
85
  STSchema  *pSchema = NULL;
H
Hongze Cheng 已提交
86 87 88 89
  TSKEY      rowKey = 0;
  TSKEY      fKey = 0;
  bool       isRowDel = false;
  int        filterIter = 0;
H
refact  
Hongze Cheng 已提交
90
  STSRow    *row = NULL;
H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99 100
  SMergeInfo mInfo;

  if (pMergeInfo == NULL) pMergeInfo = &mInfo;

  memset(pMergeInfo, 0, sizeof(*pMergeInfo));
  pMergeInfo->keyFirst = INT64_MAX;
  pMergeInfo->keyLast = INT64_MIN;
  if (pCols) tdResetDataCols(pCols);

  row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
101
  if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
102 103 104
    rowKey = INT64_MAX;
    isRowDel = false;
  } else {
C
Cary Xu 已提交
105 106
    rowKey = TD_ROW_KEY(row);
    isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
107 108 109 110 111 112 113 114 115 116 117 118
  }

  if (filterIter >= nFilterKeys) {
    fKey = INT64_MAX;
  } else {
    fKey = tdGetKey(filterKeys[filterIter]);
  }

  while (true) {
    if (fKey == INT64_MAX && rowKey == INT64_MAX) break;

    if (fKey < rowKey) {
dengyihao's avatar
dengyihao 已提交
119 120
      pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
      pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    } else if (fKey > rowKey) {
      if (isRowDel) {
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsInserted++;
        pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
136 137
        pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
        pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
138 139 140 141 142
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
143
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
144 145 146
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
147 148
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161
      }
    } else {
      if (isRowDel) {
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
          pMergeInfo->rowsUpdated++;
          pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
162 163
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
164 165
          tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
        } else {
dengyihao's avatar
dengyihao 已提交
166 167
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
168 169 170 171 172
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
173
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
174 175 176
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
177 178
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192
      }

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
  }

  return 0;
}

C
Cary Xu 已提交
193
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
H
Hongze Cheng 已提交
194 195 196 197 198
  // STsdbMeta       *pMeta = pRepo->tsdbMeta;
  // int32_t          points = 0;
  // STable          *pTable = NULL;
  SSubmitBlkIter blkIter = {0};
  STsdbMemTable *pMemTable = pTsdb->mem;
H
refact  
Hongze Cheng 已提交
199 200 201
  void          *tptr;
  STbData       *pTbData;
  STSRow        *row;
H
Hongze Cheng 已提交
202 203
  TSKEY          keyMin;
  TSKEY          keyMax;
H
Hongze Cheng 已提交
204
  SSubmitBlk    *pBlkCopy;
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206
  // create container is nedd
C
Cary Xu 已提交
207
  tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
H
Hongze Cheng 已提交
208
  if (tptr == NULL) {
C
Cary Xu 已提交
209
    pTbData = tsdbNewTbData(pMsgIter->uid);
H
Hongze Cheng 已提交
210 211 212 213 214
    if (pTbData == NULL) {
      return -1;
    }

    // Put into hash
C
Cary Xu 已提交
215
    taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData));
H
Hongze Cheng 已提交
216 217 218 219 220 221 222

    // Put into skiplist
    tSkipListPut(pMemTable->pSlIdx, pTbData);
  } else {
    pTbData = *(STbData **)tptr;
  }

H
Hongze Cheng 已提交
223
  // copy data to buffer pool
C
Cary Xu 已提交
224 225
  pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pMsgIter->dataLen + sizeof(*pBlock));
  memcpy(pBlkCopy, pBlock, pMsgIter->dataLen + sizeof(*pBlock));
H
Hongze Cheng 已提交
226

C
Cary Xu 已提交
227
  tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
H
Hongze Cheng 已提交
228
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
229
  keyMin = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231
  tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
H
Hongze Cheng 已提交
232

H
Hongze Cheng 已提交
233
  // Set statistics
C
Cary Xu 已提交
234
  keyMax = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
235

C
Cary Xu 已提交
236
  pTbData->nrows += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
237 238
  if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
  if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
H
Hongze Cheng 已提交
239

C
Cary Xu 已提交
240
  pMemTable->nRow += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
241 242
  if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
  if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
H
Hongze Cheng 已提交
243

C
Cary Xu 已提交
244
  (*pAffectedRows) += pMsgIter->numOfRows;
C
Cary Xu 已提交
245

H
Hongze Cheng 已提交
246 247 248 249
  return 0;
}

static STbData *tsdbNewTbData(tb_uid_t uid) {
wafwerar's avatar
wafwerar 已提交
250
  STbData *pTbData = (STbData *)taosMemoryCalloc(1, sizeof(*pTbData));
H
Hongze Cheng 已提交
251 252 253 254 255 256 257 258
  if (pTbData == NULL) {
    return NULL;
  }

  pTbData->uid = uid;
  pTbData->keyMin = TSKEY_MAX;
  pTbData->keyMax = TSKEY_MIN;
  pTbData->nrows = 0;
C
Cary Xu 已提交
259
#if 0
H
Hongze Cheng 已提交
260 261
  pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
                                   tsdbGetTsTupleKey);
C
Cary Xu 已提交
262 263 264
#endif
  pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY,
                                   tsdbGetTsTupleKey);
H
Hongze Cheng 已提交
265
  if (pTbData->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
266
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
267 268 269 270 271 272 273 274 275
    return NULL;
  }

  return pTbData;
}

static void tsdbFreeTbData(STbData *pTbData) {
  if (pTbData) {
    tSkipListDestroy(pTbData->pData);
wafwerar's avatar
wafwerar 已提交
276
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
277 278 279
  }
}

C
Cary Xu 已提交
280
static char *tsdbGetTsTupleKey(const void *data) { return (char *)TD_ROW_KEY_ADDR((STSRow *)data); }
H
Hongze Cheng 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296

static int tsdbTbDataComp(const void *arg1, const void *arg2) {
  STbData *pTbData1 = (STbData *)arg1;
  STbData *pTbData2 = (STbData *)arg2;

  if (pTbData1->uid > pTbData2->uid) {
    return 1;
  } else if (pTbData1->uid == pTbData2->uid) {
    return 0;
  } else {
    return -1;
  }
}

static char *tsdbTbDataGetUid(const void *arg) {
  STbData *pTbData = (STbData *)arg;
H
Hongze Cheng 已提交
297
  return (char *)(&(pTbData->uid));
H
Hongze Cheng 已提交
298
}
C
Cary Xu 已提交
299
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row) {
H
Hongze Cheng 已提交
300
  if (pCols) {
C
Cary Xu 已提交
301 302
    if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
      *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row));
H
Hongze Cheng 已提交
303 304 305 306 307 308
      if (*ppSchema == NULL) {
        ASSERT(false);
        return -1;
      }
    }

C
Cary Xu 已提交
309
    tdAppendSTSRowToDataCol(row, *ppSchema, pCols, false);
H
Hongze Cheng 已提交
310 311 312
  }

  return 0;
H
Hongze Cheng 已提交
313
}