tsdbMemTable.c 12.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
static STbData *tsdbNewTbData(tb_uid_t uid);
static void     tsdbFreeTbData(STbData *pTbData);
H
refact  
Hongze Cheng 已提交
20
static char    *tsdbGetTsTupleKey(const void *data);
H
Hongze Cheng 已提交
21
static int      tsdbTbDataComp(const void *arg1, const void *arg2);
H
refact  
Hongze Cheng 已提交
22
static char    *tsdbTbDataGetUid(const void *arg);
23 24
static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
                                    bool merge);
H
Hongze Cheng 已提交
25

H
refact  
Hongze Cheng 已提交
26 27 28
int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
  SMemTable *pMemTable;
  SVnode    *pVnode;
H
Hongze Cheng 已提交
29 30 31 32 33

  *ppMemTable = NULL;
  pVnode = pTsdb->pVnode;

  // alloc handle
H
refact  
Hongze Cheng 已提交
34
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
35
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
36
    return -1;
H
Hongze Cheng 已提交
37 38
  }

H
Hongze Cheng 已提交
39 40
  pMemTable->pTsdb = pTsdb;
  pMemTable->nRef = 1;
H
Hongze Cheng 已提交
41
  taosInitRWLatch(&pMemTable->latch);
H
more  
Hongze Cheng 已提交
42 43
  pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX};
  pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1};
H
Hongze Cheng 已提交
44
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
45 46
  pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t),
                                      tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
H
Hongze Cheng 已提交
47
  if (pMemTable->pSlIdx == NULL) {
wafwerar's avatar
wafwerar 已提交
48
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
49
    return -1;
H
Hongze Cheng 已提交
50 51 52 53
  }

  pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pMemTable->pHashIdx == NULL) {
H
Hongze Cheng 已提交
54
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
55
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
56
    return -1;
H
Hongze Cheng 已提交
57
  }
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59
  *ppMemTable = pMemTable;
H
Hongze Cheng 已提交
60
  return 0;
H
Hongze Cheng 已提交
61 62
}

H
refact  
Hongze Cheng 已提交
63
void tsdbMemTableDestroy(SMemTable *pMemTable) {
H
Hongze Cheng 已提交
64 65
  if (pMemTable) {
    taosHashCleanup(pMemTable->pHashIdx);
H
Hongze Cheng 已提交
66 67 68 69 70 71 72 73 74 75
    SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx);
    SSkipListNode     *pNode = NULL;
    STbData           *pTbData = NULL;
    for (;;) {
      if (!tSkipListIterNext(pIter)) break;
      pNode = tSkipListIterGet(pIter);
      pTbData = (STbData *)pNode->pData;
      tsdbFreeTbData(pTbData);
    }
    tSkipListDestroyIter(pIter);
H
Hongze Cheng 已提交
76
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
77
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
78 79
  }
}
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81 82 83 84 85 86 87 88 89 90 91
/**
 * This is an important function to load data or try to load data from memory skiplist iterator.
 *
 * This function load memory data until:
 * 1. iterator ends
 * 2. data key exceeds maxKey
 * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
 * 4. operations in pCols not exceeds its max capacity if pCols is given
 *
 * The function tries to procceed AS MUCH AS POSSIBLE.
 */
92 93
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
                          SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
H
Hongze Cheng 已提交
94 95
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
96 97 98 99
  STSchema *pSchema = NULL;
  TSKEY     rowKey = 0;
  TSKEY     fKey = 0;
  // only fetch lastKey from mem data as file data not used in this function actually
C
Cary Xu 已提交
100
  TSKEY      lastKey = TSKEY_INITIAL_VAL;
H
Hongze Cheng 已提交
101 102
  bool       isRowDel = false;
  int        filterIter = 0;
H
refact  
Hongze Cheng 已提交
103
  STSRow    *row = NULL;
H
Hongze Cheng 已提交
104 105
  SMergeInfo mInfo;

106 107 108
  // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
  // query handle)

H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116
  if (pMergeInfo == NULL) pMergeInfo = &mInfo;

  memset(pMergeInfo, 0, sizeof(*pMergeInfo));
  pMergeInfo->keyFirst = INT64_MAX;
  pMergeInfo->keyLast = INT64_MIN;
  if (pCols) tdResetDataCols(pCols);

  row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
117
  if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
118 119 120
    rowKey = INT64_MAX;
    isRowDel = false;
  } else {
C
Cary Xu 已提交
121 122
    rowKey = TD_ROW_KEY(row);
    isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
123 124 125 126 127 128 129
  }

  if (filterIter >= nFilterKeys) {
    fKey = INT64_MAX;
  } else {
    fKey = tdGetKey(filterKeys[filterIter]);
  }
C
Cary Xu 已提交
130 131
  // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
  // 2. rowKey - would dup since Multi-Version supported
H
Hongze Cheng 已提交
132 133 134 135
  while (true) {
    if (fKey == INT64_MAX && rowKey == INT64_MAX) break;

    if (fKey < rowKey) {
dengyihao's avatar
dengyihao 已提交
136 137
      pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
      pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
138 139 140 141 142 143 144

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
145 146
#if 1
    } else if (fKey > rowKey) {
C
Cary Xu 已提交
147
      if (isRowDel) {
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
        // TODO: support delete function
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;

        if (lastKey != rowKey) {
          pMergeInfo->rowsInserted++;
          pMergeInfo->nOperations++;
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
          if (pCols) {
            if (lastKey != TSKEY_INITIAL_VAL) {
              ++pCols->numOfRows;
            }
163
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
164 165 166 167
          }
          lastKey = rowKey;
        } else {
          if (keepDup) {
168
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
          } else {
            // discard
          }
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }
    } else {           // fkey == rowKey
      if (isRowDel) {  // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
C
Cary Xu 已提交
186 187 188 189
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
190
        tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
C
Cary Xu 已提交
191 192 193
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
194 195 196 197 198
          if (lastKey != rowKey) {
            pMergeInfo->rowsUpdated++;
            pMergeInfo->nOperations++;
            pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
            pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
C
Cary Xu 已提交
199
            if (pCols) {
C
Cary Xu 已提交
200 201 202
              if (lastKey != TSKEY_INITIAL_VAL) {
                ++pCols->numOfRows;
              }
203
              tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
C
Cary Xu 已提交
204
            }
C
Cary Xu 已提交
205
            lastKey = rowKey;
206
          } else {
207
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
208
          }
C
Cary Xu 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
        } else {
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }

H
Hongze Cheng 已提交
225 226 227 228 229 230 231
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
232 233
#endif
  }
C
Cary Xu 已提交
234
  if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
235
    ++pCols->numOfRows;
H
Hongze Cheng 已提交
236 237 238 239 240
  }

  return 0;
}

H
Hongze Cheng 已提交
241
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
H
Hongze Cheng 已提交
242
  SSubmitBlkIter blkIter = {0};
H
refact  
Hongze Cheng 已提交
243
  SMemTable     *pMemTable = pTsdb->mem;
H
refact  
Hongze Cheng 已提交
244 245 246
  void          *tptr;
  STbData       *pTbData;
  STSRow        *row;
H
Hongze Cheng 已提交
247 248
  TSKEY          keyMin;
  TSKEY          keyMax;
H
Hongze Cheng 已提交
249
  SSubmitBlk    *pBlkCopy;
H
Hongze Cheng 已提交
250
  int64_t        sverNew;
H
Hongze Cheng 已提交
251

H
Hongze Cheng 已提交
252 253 254 255 256 257 258 259 260
  // check if table exists
  SMetaReader mr = {0};
  SMetaEntry  me = {0};
  metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
  if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
    metaReaderClear(&mr);
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
    return -1;
  }
H
more  
Hongze Cheng 已提交
261
  if (pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name);
262

H
Hongze Cheng 已提交
263
  if (mr.me.type == TSDB_NORMAL_TABLE) {
264
    sverNew = mr.me.ntbEntry.schemaRow.version;
H
Hongze Cheng 已提交
265 266
  } else {
    metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
267
    sverNew = mr.me.stbEntry.schemaRow.version;
H
Hongze Cheng 已提交
268
  }
H
Hongze Cheng 已提交
269 270
  metaReaderClear(&mr);

H
Hongze Cheng 已提交
271
  // create container is nedd
C
Cary Xu 已提交
272
  tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
H
Hongze Cheng 已提交
273
  if (tptr == NULL) {
C
Cary Xu 已提交
274
    pTbData = tsdbNewTbData(pMsgIter->uid);
H
Hongze Cheng 已提交
275 276 277 278 279
    if (pTbData == NULL) {
      return -1;
    }

    // Put into hash
C
Cary Xu 已提交
280
    taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData));
H
Hongze Cheng 已提交
281 282 283 284 285 286 287

    // Put into skiplist
    tSkipListPut(pMemTable->pSlIdx, pTbData);
  } else {
    pTbData = *(STbData **)tptr;
  }

H
Hongze Cheng 已提交
288
  // copy data to buffer pool
C
Cary Xu 已提交
289
  int32_t tlen = pMsgIter->dataLen + pMsgIter->schemaLen + sizeof(*pBlock);
H
Hongze Cheng 已提交
290
  pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->pVnode->inUse, tlen);
C
Cary Xu 已提交
291
  memcpy(pBlkCopy, pBlock, tlen);
H
Hongze Cheng 已提交
292

C
Cary Xu 已提交
293
  tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
H
Hongze Cheng 已提交
294
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
295
  keyMin = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
296

H
Hongze Cheng 已提交
297
  tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
H
Hongze Cheng 已提交
298

C
Cary Xu 已提交
299
#ifdef TD_DEBUG_PRINT_ROW
C
Cary Xu 已提交
300 301
  printf("!!! %s:%d vgId:%d dir:%s table:%" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__,
         TD_VID(pTsdb->pVnode), pTsdb->dir, pTbData->uid, SL_SIZE(pTbData->pData));
C
Cary Xu 已提交
302 303
#endif

H
Hongze Cheng 已提交
304
  // Set statistics
C
Cary Xu 已提交
305
  keyMax = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
306

C
Cary Xu 已提交
307
  pTbData->nrows += pMsgIter->numOfRows;
H
refact  
Hongze Cheng 已提交
308 309
  if (pTbData->minKey.ts > keyMin) pTbData->minKey.ts = keyMin;
  if (pTbData->maxKey.ts < keyMax) pTbData->maxKey.ts = keyMax;
H
Hongze Cheng 已提交
310

C
Cary Xu 已提交
311
  pMemTable->nRow += pMsgIter->numOfRows;
H
more  
Hongze Cheng 已提交
312 313
  if (pMemTable->minKey.ts > keyMin) pMemTable->minKey.ts = keyMin;
  if (pMemTable->maxKey.ts < keyMax) pMemTable->maxKey.ts = keyMax;
H
Hongze Cheng 已提交
314

D
dapan 已提交
315 316
  pRsp->numOfRows = pMsgIter->numOfRows;
  pRsp->affectedRows = pMsgIter->numOfRows;
H
Hongze Cheng 已提交
317
  pRsp->sver = sverNew;
C
Cary Xu 已提交
318

H
Hongze Cheng 已提交
319 320 321 322
  return 0;
}

static STbData *tsdbNewTbData(tb_uid_t uid) {
wafwerar's avatar
wafwerar 已提交
323
  STbData *pTbData = (STbData *)taosMemoryCalloc(1, sizeof(*pTbData));
H
Hongze Cheng 已提交
324 325 326 327 328
  if (pTbData == NULL) {
    return NULL;
  }

  pTbData->uid = uid;
H
refact  
Hongze Cheng 已提交
329 330
  pTbData->minKey.ts = TSKEY_MAX;
  pTbData->maxKey.ts = TSKEY_MIN;
H
Hongze Cheng 已提交
331
  pTbData->nrows = 0;
C
Cary Xu 已提交
332
#if 0
H
Hongze Cheng 已提交
333 334
  pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
                                   tsdbGetTsTupleKey);
C
Cary Xu 已提交
335
#endif
336 337
  pTbData->pData =
      tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY, tsdbGetTsTupleKey);
H
Hongze Cheng 已提交
338
  if (pTbData->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
339
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
340 341 342 343 344 345 346 347 348
    return NULL;
  }

  return pTbData;
}

static void tsdbFreeTbData(STbData *pTbData) {
  if (pTbData) {
    tSkipListDestroy(pTbData->pData);
wafwerar's avatar
wafwerar 已提交
349
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
350 351 352
  }
}

C
Cary Xu 已提交
353
static char *tsdbGetTsTupleKey(const void *data) { return (char *)TD_ROW_KEY_ADDR((STSRow *)data); }
H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369

static int tsdbTbDataComp(const void *arg1, const void *arg2) {
  STbData *pTbData1 = (STbData *)arg1;
  STbData *pTbData2 = (STbData *)arg2;

  if (pTbData1->uid > pTbData2->uid) {
    return 1;
  } else if (pTbData1->uid == pTbData2->uid) {
    return 0;
  } else {
    return -1;
  }
}

static char *tsdbTbDataGetUid(const void *arg) {
  STbData *pTbData = (STbData *)arg;
H
Hongze Cheng 已提交
370
  return (char *)(&(pTbData->uid));
H
Hongze Cheng 已提交
371
}
372 373 374

static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
                                    bool merge) {
H
Hongze Cheng 已提交
375
  if (pCols) {
C
Cary Xu 已提交
376
    if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
377
      *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row));
H
Hongze Cheng 已提交
378 379 380 381 382 383
      if (*ppSchema == NULL) {
        ASSERT(false);
        return -1;
      }
    }

384
    tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
H
Hongze Cheng 已提交
385 386 387
  }

  return 0;
H
Hongze Cheng 已提交
388
}