tsdbMemTable.c 13.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
static STbData *tsdbNewTbData(tb_uid_t uid);
static void     tsdbFreeTbData(STbData *pTbData);
H
refact  
Hongze Cheng 已提交
20
static char    *tsdbGetTsTupleKey(const void *data);
H
Hongze Cheng 已提交
21
static int      tsdbTbDataComp(const void *arg1, const void *arg2);
H
refact  
Hongze Cheng 已提交
22
static char    *tsdbTbDataGetUid(const void *arg);
23
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge);
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25 26 27 28 29 30 31 32 33
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
  STsdbMemTable *pMemTable;
  SVnode        *pVnode;

  *ppMemTable = NULL;
  pVnode = pTsdb->pVnode;

  // alloc handle
  pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
34
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
35
    return -1;
H
Hongze Cheng 已提交
36 37
  }

H
Hongze Cheng 已提交
38
  pMemTable->pPool = pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
39
  T_REF_INIT_VAL(pMemTable, 1);
H
Hongze Cheng 已提交
40
  taosInitRWLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
41
  pMemTable->keyMin = TSKEY_MAX;
H
Hongze Cheng 已提交
42
  pMemTable->keyMax = TSKEY_MIN;
H
Hongze Cheng 已提交
43
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
44 45
  pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t),
                                      tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
H
Hongze Cheng 已提交
46
  if (pMemTable->pSlIdx == NULL) {
wafwerar's avatar
wafwerar 已提交
47
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
48
    return -1;
H
Hongze Cheng 已提交
49 50 51 52
  }

  pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pMemTable->pHashIdx == NULL) {
H
Hongze Cheng 已提交
53
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
54
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
55
    return -1;
H
Hongze Cheng 已提交
56
  }
H
Hongze Cheng 已提交
57

H
Hongze Cheng 已提交
58
  *ppMemTable = pMemTable;
H
Hongze Cheng 已提交
59
  return 0;
H
Hongze Cheng 已提交
60 61
}

H
Hongze Cheng 已提交
62
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
H
Hongze Cheng 已提交
63 64
  if (pMemTable) {
    taosHashCleanup(pMemTable->pHashIdx);
H
Hongze Cheng 已提交
65 66 67 68 69 70 71 72 73 74
    SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx);
    SSkipListNode     *pNode = NULL;
    STbData           *pTbData = NULL;
    for (;;) {
      if (!tSkipListIterNext(pIter)) break;
      pNode = tSkipListIterGet(pIter);
      pTbData = (STbData *)pNode->pData;
      tsdbFreeTbData(pTbData);
    }
    tSkipListDestroyIter(pIter);
H
Hongze Cheng 已提交
75
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
76
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
77 78
  }
}
H
Hongze Cheng 已提交
79

H
Hongze Cheng 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
/**
 * This is an important function to load data or try to load data from memory skiplist iterator.
 *
 * This function load memory data until:
 * 1. iterator ends
 * 2. data key exceeds maxKey
 * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
 * 4. operations in pCols not exceeds its max capacity if pCols is given
 *
 * The function tries to procceed AS MUCH AS POSSIBLE.
 */
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
                          TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
95 96 97 98
  STSchema *pSchema = NULL;
  TSKEY     rowKey = 0;
  TSKEY     fKey = 0;
  // only fetch lastKey from mem data as file data not used in this function actually
C
Cary Xu 已提交
99
  TSKEY      lastKey = TSKEY_INITIAL_VAL;
H
Hongze Cheng 已提交
100 101
  bool       isRowDel = false;
  int        filterIter = 0;
H
refact  
Hongze Cheng 已提交
102
  STSRow    *row = NULL;
H
Hongze Cheng 已提交
103 104
  SMergeInfo mInfo;

105 106 107
  // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
  // query handle)

H
Hongze Cheng 已提交
108 109 110 111 112 113 114 115
  if (pMergeInfo == NULL) pMergeInfo = &mInfo;

  memset(pMergeInfo, 0, sizeof(*pMergeInfo));
  pMergeInfo->keyFirst = INT64_MAX;
  pMergeInfo->keyLast = INT64_MIN;
  if (pCols) tdResetDataCols(pCols);

  row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
116
  if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
117 118 119
    rowKey = INT64_MAX;
    isRowDel = false;
  } else {
C
Cary Xu 已提交
120 121
    rowKey = TD_ROW_KEY(row);
    isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
122 123 124 125 126 127 128
  }

  if (filterIter >= nFilterKeys) {
    fKey = INT64_MAX;
  } else {
    fKey = tdGetKey(filterKeys[filterIter]);
  }
C
Cary Xu 已提交
129 130
  // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
  // 2. rowKey - would dup since Multi-Version supported
H
Hongze Cheng 已提交
131 132 133 134
  while (true) {
    if (fKey == INT64_MAX && rowKey == INT64_MAX) break;

    if (fKey < rowKey) {
dengyihao's avatar
dengyihao 已提交
135 136
      pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
      pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
137 138 139 140 141 142 143

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
C
Cary Xu 已提交
144
#if 0
H
Hongze Cheng 已提交
145 146 147 148 149 150
    } else if (fKey > rowKey) {
      if (isRowDel) {
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
C
Cary Xu 已提交
151

H
Hongze Cheng 已提交
152 153
        pMergeInfo->rowsInserted++;
        pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
154 155
        pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
        pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
156 157 158 159 160
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
161
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
162 163 164
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
165 166
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179
      }
    } else {
      if (isRowDel) {
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
          pMergeInfo->rowsUpdated++;
          pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
180 181
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
182 183
          tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
        } else {
dengyihao's avatar
dengyihao 已提交
184 185
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
186 187 188 189 190
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
191
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
192 193 194
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
195 196
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
197 198
      }

C
Cary Xu 已提交
199 200 201 202 203 204 205 206
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
#endif
207 208
#if 1
    } else if (fKey > rowKey) {
C
Cary Xu 已提交
209
      if (isRowDel) {
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
        // TODO: support delete function
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;

        if (lastKey != rowKey) {
          pMergeInfo->rowsInserted++;
          pMergeInfo->nOperations++;
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
          if (pCols) {
            if (lastKey != TSKEY_INITIAL_VAL) {
              ++pCols->numOfRows;
            }
            tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
          }
          lastKey = rowKey;
        } else {
          if (keepDup) {
            tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
          } else {
            // discard
          }
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }
    } else {           // fkey == rowKey
      if (isRowDel) {  // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
C
Cary Xu 已提交
248 249 250 251
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
252
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
C
Cary Xu 已提交
253 254 255
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
256 257 258 259 260
          if (lastKey != rowKey) {
            pMergeInfo->rowsUpdated++;
            pMergeInfo->nOperations++;
            pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
            pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
C
Cary Xu 已提交
261
            if (pCols) {
C
Cary Xu 已提交
262 263 264
              if (lastKey != TSKEY_INITIAL_VAL) {
                ++pCols->numOfRows;
              }
C
Cary Xu 已提交
265 266
              tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
            }
C
Cary Xu 已提交
267
            lastKey = rowKey;
268 269 270
          } else {
            tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
          }
C
Cary Xu 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
        } else {
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }

H
Hongze Cheng 已提交
287 288 289 290 291 292 293
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
294 295
#endif
  }
C
Cary Xu 已提交
296
  if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
297
    ++pCols->numOfRows;
H
Hongze Cheng 已提交
298 299 300 301 302
  }

  return 0;
}

H
Hongze Cheng 已提交
303
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
H
Hongze Cheng 已提交
304 305
  SSubmitBlkIter blkIter = {0};
  STsdbMemTable *pMemTable = pTsdb->mem;
H
refact  
Hongze Cheng 已提交
306 307 308
  void          *tptr;
  STbData       *pTbData;
  STSRow        *row;
H
Hongze Cheng 已提交
309 310
  TSKEY          keyMin;
  TSKEY          keyMax;
H
Hongze Cheng 已提交
311
  SSubmitBlk    *pBlkCopy;
H
Hongze Cheng 已提交
312

H
Hongze Cheng 已提交
313
  // create container is nedd
C
Cary Xu 已提交
314
  tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
H
Hongze Cheng 已提交
315
  if (tptr == NULL) {
C
Cary Xu 已提交
316
    pTbData = tsdbNewTbData(pMsgIter->uid);
H
Hongze Cheng 已提交
317 318 319 320 321
    if (pTbData == NULL) {
      return -1;
    }

    // Put into hash
C
Cary Xu 已提交
322
    taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData));
H
Hongze Cheng 已提交
323 324 325 326 327 328 329

    // Put into skiplist
    tSkipListPut(pMemTable->pSlIdx, pTbData);
  } else {
    pTbData = *(STbData **)tptr;
  }

H
Hongze Cheng 已提交
330
  // copy data to buffer pool
C
Cary Xu 已提交
331 332 333
  int32_t tlen = pMsgIter->dataLen + pMsgIter->schemaLen + sizeof(*pBlock);
  pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, tlen);
  memcpy(pBlkCopy, pBlock, tlen);
H
Hongze Cheng 已提交
334

C
Cary Xu 已提交
335
  tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
H
Hongze Cheng 已提交
336
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
337
  keyMin = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
338

H
Hongze Cheng 已提交
339
  tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
H
Hongze Cheng 已提交
340

C
Cary Xu 已提交
341 342 343 344 345
#ifdef TD_DEBUG_PRINT_ROW
  printf("!!! %s:%d table %" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, pTbData->uid,
         SL_SIZE(pTbData->pData));
#endif

H
Hongze Cheng 已提交
346
  // Set statistics
C
Cary Xu 已提交
347
  keyMax = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
348

C
Cary Xu 已提交
349
  pTbData->nrows += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
350 351
  if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
  if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
H
Hongze Cheng 已提交
352

C
Cary Xu 已提交
353
  pMemTable->nRow += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
354 355
  if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
  if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
H
Hongze Cheng 已提交
356

D
dapan 已提交
357 358
  pRsp->numOfRows = pMsgIter->numOfRows;
  pRsp->affectedRows = pMsgIter->numOfRows;
C
Cary Xu 已提交
359

H
Hongze Cheng 已提交
360 361 362 363
  return 0;
}

static STbData *tsdbNewTbData(tb_uid_t uid) {
wafwerar's avatar
wafwerar 已提交
364
  STbData *pTbData = (STbData *)taosMemoryCalloc(1, sizeof(*pTbData));
H
Hongze Cheng 已提交
365 366 367 368 369 370 371 372
  if (pTbData == NULL) {
    return NULL;
  }

  pTbData->uid = uid;
  pTbData->keyMin = TSKEY_MAX;
  pTbData->keyMax = TSKEY_MIN;
  pTbData->nrows = 0;
C
Cary Xu 已提交
373
#if 0
H
Hongze Cheng 已提交
374 375
  pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
                                   tsdbGetTsTupleKey);
C
Cary Xu 已提交
376
#endif
377 378
  pTbData->pData =
      tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY, tsdbGetTsTupleKey);
H
Hongze Cheng 已提交
379
  if (pTbData->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
380
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
381 382 383 384 385 386 387 388 389
    return NULL;
  }

  return pTbData;
}

static void tsdbFreeTbData(STbData *pTbData) {
  if (pTbData) {
    tSkipListDestroy(pTbData->pData);
wafwerar's avatar
wafwerar 已提交
390
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
391 392 393
  }
}

C
Cary Xu 已提交
394
static char *tsdbGetTsTupleKey(const void *data) { return (char *)TD_ROW_KEY_ADDR((STSRow *)data); }
H
Hongze Cheng 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410

static int tsdbTbDataComp(const void *arg1, const void *arg2) {
  STbData *pTbData1 = (STbData *)arg1;
  STbData *pTbData2 = (STbData *)arg2;

  if (pTbData1->uid > pTbData2->uid) {
    return 1;
  } else if (pTbData1->uid == pTbData2->uid) {
    return 0;
  } else {
    return -1;
  }
}

static char *tsdbTbDataGetUid(const void *arg) {
  STbData *pTbData = (STbData *)arg;
H
Hongze Cheng 已提交
411
  return (char *)(&(pTbData->uid));
H
Hongze Cheng 已提交
412
}
413
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge) {
H
Hongze Cheng 已提交
414
  if (pCols) {
C
Cary Xu 已提交
415 416
    if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
      *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row));
H
Hongze Cheng 已提交
417 418 419 420 421 422
      if (*ppSchema == NULL) {
        ASSERT(false);
        return -1;
      }
    }

423
    tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
H
Hongze Cheng 已提交
424 425 426
  }

  return 0;
H
Hongze Cheng 已提交
427
}