tsdbMemTable.c 23.5 KB
Newer Older
H
TD-353  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
#include "tsdbMain.h"

#define TSDB_DATA_SKIPLIST_LEVEL 5

H
TD-353  
Hongze Cheng 已提交
21 22 23
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);

static void        tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
H
TD-987  
Hongze Cheng 已提交
24
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
H
TD-353  
Hongze Cheng 已提交
25 26 27 28
static void        tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void        tsdbFreeTableData(STableData *pTableData);
static char *      tsdbGetTsTupleKey(const void *data);
H
TD-353  
Hongze Cheng 已提交
29
static void *      tsdbCommitData(void *arg);
H
TD-353  
Hongze Cheng 已提交
30
static int         tsdbCommitMeta(STsdbRepo *pRepo);
H
TD-353  
Hongze Cheng 已提交
31 32
static void        tsdbEndCommit(STsdbRepo *pRepo);
static int         tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
H
TD-987  
Hongze Cheng 已提交
33
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
H
Hongze Cheng 已提交
34 35
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void         tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
H
TD-987  
Hongze Cheng 已提交
36
static int          tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
H
TD-353  
Hongze Cheng 已提交
37

H
TD-353  
Hongze Cheng 已提交
38 39
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
H
TD-353  
Hongze Cheng 已提交
40
  STsdbCfg *  pCfg = &pRepo->config;
H
TD-987  
Hongze Cheng 已提交
41
  STsdbMeta * pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
42 43 44 45 46
  int32_t     level = 0;
  int32_t     headSize = 0;
  TSKEY       key = dataRowKey(row);
  SMemTable * pMemTable = pRepo->mem;
  STableData *pTableData = NULL;
H
TD-353  
Hongze Cheng 已提交
47
  SSkipList * pSList = NULL;
H
TD-353  
Hongze Cheng 已提交
48 49
  int         bytes = 0;

H
TD-987  
Hongze Cheng 已提交
50
  if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
H
TD-353  
Hongze Cheng 已提交
51
      pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
H
TD-353  
Hongze Cheng 已提交
52
    pTableData = pMemTable->tData[TABLE_TID(pTable)];
H
TD-353  
Hongze Cheng 已提交
53
    pSList = pTableData->pData;
H
TD-353  
Hongze Cheng 已提交
54
  }
H
TD-353  
Hongze Cheng 已提交
55

H
TD-353  
Hongze Cheng 已提交
56
  tSkipListNewNodeInfo(pSList, &level, &headSize);
H
TD-353  
Hongze Cheng 已提交
57

H
TD-353  
Hongze Cheng 已提交
58 59
  bytes = headSize + dataRowLen(row);
  SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes);
H
TD-353  
Hongze Cheng 已提交
60
  if (pNode == NULL) {
H
TD-353  
Hongze Cheng 已提交
61 62
    tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
              REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), bytes, tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
63 64 65 66 67
    return -1;
  }
  pNode->level = level;
  dataRowCpy(SL_GET_NODE_DATA(pNode), row);

H
TD-353  
Hongze Cheng 已提交
68 69 70
  // Operations above may change pRepo->mem, retake those values
  ASSERT(pRepo->mem != NULL);
  pMemTable = pRepo->mem;
H
TD-987  
Hongze Cheng 已提交
71 72 73 74

  if (TABLE_TID(pTable) >= pMemTable->maxTables) {
    if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;;
  }
H
TD-353  
Hongze Cheng 已提交
75 76
  pTableData = pMemTable->tData[TABLE_TID(pTable)];

H
TD-353  
Hongze Cheng 已提交
77
  if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
H
TD-353  
Hongze Cheng 已提交
78
    if (pTableData != NULL) {  // destroy the table skiplist (may have race condition problem)
H
TD-987  
Hongze Cheng 已提交
79
      taosWLockLatch(&(pMemTable->latch));
H
TD-353  
Hongze Cheng 已提交
80 81
      pMemTable->tData[TABLE_TID(pTable)] = NULL;
      tsdbFreeTableData(pTableData);
H
TD-987  
Hongze Cheng 已提交
82
      taosWUnLockLatch(&(pMemTable->latch));
H
TD-353  
Hongze Cheng 已提交
83
    }
H
TD-987  
Hongze Cheng 已提交
84

H
TD-353  
Hongze Cheng 已提交
85
    pTableData = tsdbNewTableData(pCfg, pTable);
H
TD-353  
Hongze Cheng 已提交
86
    if (pTableData == NULL) {
H
TD-353  
Hongze Cheng 已提交
87 88 89 90
      tsdbError("vgId:%d failed to insert row with key %" PRId64
                " to table %s while create new table data object since %s",
                REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
      tsdbFreeBytes(pRepo, (void *)pNode, bytes);
H
TD-353  
Hongze Cheng 已提交
91 92 93 94 95 96
      return -1;
    }

    pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
  }

H
TD-353  
Hongze Cheng 已提交
97
  ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
H
TD-353  
Hongze Cheng 已提交
98 99

  if (tSkipListPut(pTableData->pData, pNode) == NULL) {
H
TD-353  
Hongze Cheng 已提交
100 101
    tsdbFreeBytes(pRepo, (void *)pNode, bytes);
  } else {
102
    if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
H
TD-353  
Hongze Cheng 已提交
103 104 105
    if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
    if (pMemTable->keyLast < key) pMemTable->keyLast = key;
    pMemTable->numOfRows++;
H
TD-353  
Hongze Cheng 已提交
106

H
TD-353  
Hongze Cheng 已提交
107 108 109
    if (pTableData->keyFirst > key) pTableData->keyFirst = key;
    if (pTableData->keyLast < key) pTableData->keyLast = key;
    pTableData->numOfRows++;
H
TD-353  
Hongze Cheng 已提交
110

H
TD-353  
Hongze Cheng 已提交
111 112
    ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
  }
H
TD-353  
Hongze Cheng 已提交
113

114
  tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
H
TD-353  
Hongze Cheng 已提交
115
            TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key);
H
TD-353  
Hongze Cheng 已提交
116 117 118 119 120

  return 0;
}

int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
H
TD-353  
Hongze Cheng 已提交
121
  if (pMemTable == NULL) return 0;
H
Hui Li 已提交
122 123
  int ref = T_REF_INC(pMemTable);
	tsdbDebug("vgId:%d ref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
H
TD-353  
Hongze Cheng 已提交
124
  return 0;
H
TD-353  
Hongze Cheng 已提交
125 126 127 128
}

// Need to lock the repository
int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
H
TD-353  
Hongze Cheng 已提交
129
  if (pMemTable == NULL) return 0;
H
TD-353  
Hongze Cheng 已提交
130

H
Hui Li 已提交
131 132 133
	int ref = T_REF_DEC(pMemTable);
	tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
  if (ref == 0) {
H
TD-353  
Hongze Cheng 已提交
134 135 136
    STsdbBufPool *pBufPool = pRepo->pPool;

    SListNode *pNode = NULL;
H
TD-353  
Hongze Cheng 已提交
137
    if (tsdbLockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
138 139
    while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
      tdListAppendNode(pBufPool->bufBlockList, pNode);
H
TD-353  
Hongze Cheng 已提交
140 141 142 143 144 145 146
    }
    int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
    if (code != 0) {
      tsdbUnlockRepo(pRepo);
      tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
      terrno = TAOS_SYSTEM_ERROR(code);
      return -1;
H
TD-353  
Hongze Cheng 已提交
147
    }
H
TD-353  
Hongze Cheng 已提交
148
    if (tsdbUnlockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
149

H
TD-987  
Hongze Cheng 已提交
150
    for (int i = 0; i < pMemTable->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
151 152 153 154 155 156 157 158 159 160 161 162
      if (pMemTable->tData[i] != NULL) {
        tsdbFreeTableData(pMemTable->tData[i]);
      }
    }

    tdListDiscard(pMemTable->actList);
    tdListDiscard(pMemTable->bufBlockList);
    tsdbFreeMemTable(pMemTable);
  }
  return 0;
}

H
TD-353  
Hongze Cheng 已提交
163 164 165 166
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
  if (tsdbLockRepo(pRepo) < 0) return -1;

  *pMem = pRepo->mem;
H
TD-353  
Hongze Cheng 已提交
167
  *pIMem = pRepo->imem;
H
TD-353  
Hongze Cheng 已提交
168 169 170 171
  tsdbRefMemTable(pRepo, *pMem);
  tsdbRefMemTable(pRepo, *pIMem);

  if (tsdbUnlockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
172

H
TD-987  
Hongze Cheng 已提交
173 174 175
  if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));

  tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
H
TD-353  
Hongze Cheng 已提交
176
  return 0;
H
TD-353  
Hongze Cheng 已提交
177 178
}

H
TD-987  
Hongze Cheng 已提交
179 180 181 182 183 184 185 186 187 188 189
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
  if (pMem != NULL) {
    taosRUnLockLatch(&(pMem->latch));
    tsdbUnRefMemTable(pRepo, pMem);
  }

  if (pIMem != NULL) {
    tsdbUnRefMemTable(pRepo, pIMem);
  }
}

H
TD-353  
Hongze Cheng 已提交
190
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
H
TD-353  
Hongze Cheng 已提交
191 192 193 194
  STsdbCfg *     pCfg = &pRepo->config;
  STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);

  if (pBufBlock != NULL && pBufBlock->remain < bytes) {
195
    if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) {  // need to commit mem
H
TD-353  
Hongze Cheng 已提交
196
      if (tsdbAsyncCommit(pRepo) < 0) return NULL;
H
TD-353  
Hongze Cheng 已提交
197 198
    } else {
      if (tsdbLockRepo(pRepo) < 0) return NULL;
H
TD-353  
Hongze Cheng 已提交
199
      SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
H
TD-353  
Hongze Cheng 已提交
200
      tdListAppendNode(pRepo->mem->bufBlockList, pNode);
H
TD-353  
Hongze Cheng 已提交
201
      if (tsdbUnlockRepo(pRepo) < 0) return NULL;
H
TD-353  
Hongze Cheng 已提交
202 203 204 205
    }
  }

  if (pRepo->mem == NULL) {
H
TD-987  
Hongze Cheng 已提交
206
    SMemTable *pMemTable = tsdbNewMemTable(pRepo);
H
TD-353  
Hongze Cheng 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    if (pMemTable == NULL) return NULL;

    if (tsdbLockRepo(pRepo) < 0) {
      tsdbFreeMemTable(pMemTable);
      return NULL;
    }

    SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
    tdListAppendNode(pMemTable->bufBlockList, pNode);
    pRepo->mem = pMemTable;

    if (tsdbUnlockRepo(pRepo) < 0) return NULL;
  }

  pBufBlock = tsdbGetCurrBufBlock(pRepo);
  ASSERT(pBufBlock->remain >= bytes);
  void *ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
  pBufBlock->offset += bytes;
  pBufBlock->remain -= bytes;

H
Hongze Cheng 已提交
227 228 229
  tsdbTrace("vgId:%d allocate %d bytes from buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
            listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);

H
TD-353  
Hongze Cheng 已提交
230 231 232
  return ptr;
}

H
TD-353  
Hongze Cheng 已提交
233 234 235 236 237 238 239 240 241 242 243 244
int tsdbAsyncCommit(STsdbRepo *pRepo) {
  SMemTable *pIMem = pRepo->imem;
  int        code = 0;

  if (pIMem != NULL) {
    ASSERT(pRepo->commit);
    code = pthread_join(pRepo->commitThread, NULL);
    if (code != 0) {
      tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
H
Hongze Cheng 已提交
245
    pRepo->commit = 0;
H
TD-353  
Hongze Cheng 已提交
246 247 248 249
  }

  ASSERT(pRepo->commit == 0);
  if (pRepo->mem != NULL) {
H
Hongze Cheng 已提交
250
    if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
H
TD-353  
Hongze Cheng 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
    if (tsdbLockRepo(pRepo) < 0) return -1;
    pRepo->imem = pRepo->mem;
    pRepo->mem = NULL;
    pRepo->commit = 1;
    code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
    if (code != 0) {
      tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(code);
      tsdbUnlockRepo(pRepo);
      return -1;
    }
    if (tsdbUnlockRepo(pRepo) < 0) return -1;
  }

  if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;

  return 0;
}

H
Hongze Cheng 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
                          TSKEY *filterKeys, int nFilterKeys) {
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
  STSchema *pSchema = NULL;
  int       numOfRows = 0;
  TSKEY     keyNext = 0;
  int       filterIter = 0;

  if (nFilterKeys != 0) { // for filter purpose
    ASSERT(filterKeys != NULL);
    keyNext = tsdbNextIterKey(pIter);
    if (keyNext < 0 || keyNext > maxKey) return numOfRows;
    void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE);
    filterIter = (ptr == NULL) ? nFilterKeys : (POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY));
  }

  do {
    SDataRow row = tsdbNextIterRow(pIter);
    if (row == NULL) break;

    keyNext = dataRowKey(row);
H
TD-982  
Hongze Cheng 已提交
292
    if (keyNext > maxKey) break;
H
Hongze Cheng 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310

    bool keyFiltered = false;
    if (nFilterKeys != 0) {
      while (true) {
        if (filterIter >= nFilterKeys) break;
        if (keyNext == filterKeys[filterIter]) {
          keyFiltered = true;
          filterIter++;
          break;
        } else if (keyNext < filterKeys[filterIter]) {
          break;
        } else {
          filterIter++;
        }
      }
    }

    if (!keyFiltered) {
H
TD-982  
Hongze Cheng 已提交
311
      if (numOfRows >= maxRowsToRead) break;
H
Hongze Cheng 已提交
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
      if (pCols) {
        if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
          pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
          if (pSchema == NULL) {
            ASSERT(0);
          }
        }

        tdAppendDataRowToDataCol(row, pSchema, pCols);
      }
      numOfRows++;
    }
  } while (tSkipListIterNext(pIter));

  return numOfRows;
}

H
TD-353  
Hongze Cheng 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342
// ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
  ASSERT(pRepo != NULL);
  if (pRepo->mem == NULL) return NULL;

  SListNode *pNode = listTail(pRepo->mem->bufBlockList);
  if (pNode == NULL) return NULL;

  STsdbBufBlock *pBufBlock = NULL;
  tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock));

  return pBufBlock;
}

H
TD-353  
Hongze Cheng 已提交
343 344 345 346 347 348
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
  STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
  ASSERT(pBufBlock != NULL);
  pBufBlock->offset -= bytes;
  pBufBlock->remain += bytes;
  ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
H
Hongze Cheng 已提交
349 350
  tsdbTrace("vgId:%d return %d bytes to buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
            listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
H
TD-353  
Hongze Cheng 已提交
351 352
}

H
TD-987  
Hongze Cheng 已提交
353 354 355
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
  STsdbMeta *pMeta = pRepo->tsdbMeta;

H
TD-353  
Hongze Cheng 已提交
356 357 358 359 360 361 362 363 364 365
  SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
  if (pMemTable == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->keyFirst = INT64_MAX;
  pMemTable->keyLast = 0;
  pMemTable->numOfRows = 0;

H
TD-987  
Hongze Cheng 已提交
366 367
  pMemTable->maxTables = pMeta->maxTables;
  pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *));
H
TD-353  
Hongze Cheng 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
  if (pMemTable->tData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->actList = tdListNew(0);
  if (pMemTable->actList == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock*));
  if (pMemTable->bufBlockList == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  T_REF_INC(pMemTable);

  return pMemTable;

_err:
  tsdbFreeMemTable(pMemTable);
  return NULL;
}

static void tsdbFreeMemTable(SMemTable* pMemTable) {
  if (pMemTable) {
    ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
    ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));

    tdListFree(pMemTable->bufBlockList);
    tdListFree(pMemTable->actList);
    tfree(pMemTable->tData);
    free(pMemTable);
  }
}

static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
  STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData));
  if (pTableData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

H
TD-353  
Hongze Cheng 已提交
413
  pTableData->uid = TABLE_UID(pTable);
H
TD-353  
Hongze Cheng 已提交
414 415 416 417
  pTableData->keyFirst = INT64_MAX;
  pTableData->keyLast = 0;
  pTableData->numOfRows = 0;

H
TD-353  
Hongze Cheng 已提交
418 419
  pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
                                      TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, tsdbGetTsTupleKey);
H
TD-353  
Hongze Cheng 已提交
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
  if (pTableData->pData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  return pTableData;

_err:
  tsdbFreeTableData(pTableData);
  return NULL;
}

static void tsdbFreeTableData(STableData *pTableData) {
  if (pTableData) {
    tSkipListDestroy(pTableData->pData);
    free(pTableData);
  }
H
TD-353  
Hongze Cheng 已提交
437 438
}

H
TD-353  
Hongze Cheng 已提交
439 440 441
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }

static void *tsdbCommitData(void *arg) {
H
TD-353  
Hongze Cheng 已提交
442 443 444 445
  STsdbRepo *  pRepo = (STsdbRepo *)arg;
  SMemTable *  pMem = pRepo->imem;
  STsdbCfg *   pCfg = &pRepo->config;
  SDataCols *  pDataCols = NULL;
H
TD-353  
Hongze Cheng 已提交
446
  STsdbMeta *  pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
447
  SCommitIter *iters = NULL;
H
TD-353  
Hongze Cheng 已提交
448
  SRWHelper    whelper = {0};
H
TD-353  
Hongze Cheng 已提交
449
  ASSERT(pRepo->commit == 1);
H
TD-353  
Hongze Cheng 已提交
450
  ASSERT(pMem != NULL);
H
TD-353  
Hongze Cheng 已提交
451

S
Shengliang Guan 已提交
452
  tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
H
TD-353  
Hongze Cheng 已提交
453
            pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
H
TD-353  
Hongze Cheng 已提交
454 455

  // Create the iterator to read from cache
H
TD-353  
Hongze Cheng 已提交
456
  if (pMem->numOfRows > 0) {
H
Hongze Cheng 已提交
457
    iters = tsdbCreateCommitIters(pRepo);
H
TD-353  
Hongze Cheng 已提交
458 459 460 461
    if (iters == NULL) {
      tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _exit;
    }
H
TD-353  
Hongze Cheng 已提交
462

H
TD-353  
Hongze Cheng 已提交
463 464 465 466
    if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
      tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _exit;
    }
H
TD-353  
Hongze Cheng 已提交
467

H
TD-353  
Hongze Cheng 已提交
468 469 470 471 472 473
    if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
                REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
      goto _exit;
    }
H
TD-353  
Hongze Cheng 已提交
474

H
TD-353  
Hongze Cheng 已提交
475 476
    int sfid = TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision);
    int efid = TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision);
H
TD-353  
Hongze Cheng 已提交
477

H
TD-353  
Hongze Cheng 已提交
478 479 480 481 482 483
    // Loop to commit to each file
    for (int fid = sfid; fid <= efid; fid++) {
      if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
        tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
        goto _exit;
      }
H
TD-353  
Hongze Cheng 已提交
484 485 486
    }
  }

H
TD-353  
Hongze Cheng 已提交
487 488 489 490 491
  // Commit to update meta file
  if (tsdbCommitMeta(pRepo) < 0) {
    tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
    goto _exit;
  }
H
TD-353  
Hongze Cheng 已提交
492

H
TD-353  
Hongze Cheng 已提交
493
  tsdbFitRetention(pRepo);
H
TD-353  
Hongze Cheng 已提交
494 495 496

_exit:
  tdFreeDataCols(pDataCols);
H
TD-987  
Hongze Cheng 已提交
497
  tsdbDestroyCommitIters(iters, pMem->maxTables);
H
TD-353  
Hongze Cheng 已提交
498
  tsdbDestroyHelper(&whelper);
H
TD-353  
Hongze Cheng 已提交
499
  tsdbEndCommit(pRepo);
S
Shengliang Guan 已提交
500
  tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
H
TD-353  
Hongze Cheng 已提交
501 502 503

  return NULL;
}
H
TD-353  
Hongze Cheng 已提交
504

H
TD-353  
Hongze Cheng 已提交
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
static int tsdbCommitMeta(STsdbRepo *pRepo) {
  SMemTable *pMem = pRepo->imem;
  STsdbMeta *pMeta = pRepo->tsdbMeta;
  SActObj *  pAct = NULL;
  SActCont * pCont = NULL;

  if (listNEles(pMem->actList) > 0) {
    if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
      tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _err;
    }

    SListNode *pNode = NULL;

    while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
      pAct = (SActObj *)pNode->data;
      if (pAct->act == TSDB_UPDATE_META) {
        pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
        if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
          tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
                    tstrerror(terrno));
H
Hongze Cheng 已提交
526
          tdKVStoreEndCommit(pMeta->pStore);
H
TD-353  
Hongze Cheng 已提交
527 528 529 530 531 532
          goto _err;
        }
      } else if (pAct->act == TSDB_DROP_META) {
        if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
          tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
                    tstrerror(terrno));
H
Hongze Cheng 已提交
533
          tdKVStoreEndCommit(pMeta->pStore);
H
TD-353  
Hongze Cheng 已提交
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
          goto _err;
        }
      } else {
        ASSERT(false);
      }
    }

    if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
      tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _err;
    }
  }

  return 0;

_err:
  return -1;
}

H
TD-353  
Hongze Cheng 已提交
553 554 555 556 557 558 559
static void tsdbEndCommit(STsdbRepo *pRepo) {
  ASSERT(pRepo->commit == 1);
  if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
}

static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
  for (int i = 0; i < nIters; i++) {
H
Hongze Cheng 已提交
560
    TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
H
TD-353  
Hongze Cheng 已提交
561
    if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
H
TD-353  
Hongze Cheng 已提交
562
  }
H
TD-353  
Hongze Cheng 已提交
563 564
  return 0;
}
H
TD-353  
Hongze Cheng 已提交
565

566
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
H
TD-353  
Hongze Cheng 已提交
567 568
  *minKey = fileId * daysPerFile * tsMsPerDay[precision];
  *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
H
TD-353  
Hongze Cheng 已提交
569 570
}

H
TD-353  
Hongze Cheng 已提交
571 572
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
  char *      dataDir = NULL;
H
TD-353  
Hongze Cheng 已提交
573
  STsdbCfg *  pCfg = &pRepo->config;
H
TD-353  
Hongze Cheng 已提交
574
  STsdbFileH *pFileH = pRepo->tsdbFileH;
H
TD-353  
Hongze Cheng 已提交
575
  SFileGroup *pGroup = NULL;
H
TD-987  
Hongze Cheng 已提交
576
  SMemTable * pMem = pRepo->imem;
H
TD-353  
Hongze Cheng 已提交
577 578

  TSKEY minKey = 0, maxKey = 0;
H
TD-353  
Hongze Cheng 已提交
579
  tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
H
TD-353  
Hongze Cheng 已提交
580 581

  // Check if there are data to commit to this file
H
TD-987  
Hongze Cheng 已提交
582
  int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
H
TD-353  
Hongze Cheng 已提交
583
  if (!hasDataToCommit) {
S
Shengliang Guan 已提交
584
    tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
H
TD-353  
Hongze Cheng 已提交
585 586
    return 0;
  }
H
TD-353  
Hongze Cheng 已提交
587 588

  // Create and open files for commit
H
TD-353  
Hongze Cheng 已提交
589 590 591 592 593 594
  dataDir = tsdbGetDataDirName(pRepo->rootDir);
  if (dataDir == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

H
TD-987  
Hongze Cheng 已提交
595
  if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
H
TD-353  
Hongze Cheng 已提交
596
    tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
597 598 599 600 601
    goto _err;
  }

  // Open files for write/read
  if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
H
TD-353  
Hongze Cheng 已提交
602
    tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
603 604 605 606
    goto _err;
  }

  // Loop to commit data in each table
H
TD-987  
Hongze Cheng 已提交
607
  for (int tid = 1; tid < pMem->maxTables; tid++) {
H
TD-353  
Hongze Cheng 已提交
608 609
    SCommitIter *pIter = iters + tid;
    if (pIter->pTable == NULL) continue;
H
TD-353  
Hongze Cheng 已提交
610

H
Hongze Cheng 已提交
611 612
    taosRLockLatch(&(pIter->pTable->latch));

H
TD-353  
Hongze Cheng 已提交
613
    tsdbSetHelperTable(pHelper, pIter->pTable, pRepo);
H
TD-353  
Hongze Cheng 已提交
614

H
TD-353  
Hongze Cheng 已提交
615
    if (pIter->pIter != NULL) {
H
Hongze Cheng 已提交
616
      tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
H
TD-353  
Hongze Cheng 已提交
617

H
Hongze Cheng 已提交
618 619 620 621 622 623
      if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
        taosRUnLockLatch(&(pIter->pTable->latch));
        tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
                  TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
                  tstrerror(terrno));
        goto _err;
H
TD-353  
Hongze Cheng 已提交
624 625
      }
    }
H
TD-353  
Hongze Cheng 已提交
626

H
Hongze Cheng 已提交
627 628
    taosRUnLockLatch(&(pIter->pTable->latch));

H
TD-353  
Hongze Cheng 已提交
629 630
    // Move the last block to the new .l file if neccessary
    if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
631
      tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
632 633 634 635 636
      goto _err;
    }

    // Write the SCompBlock part
    if (tsdbWriteCompInfo(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
637
      tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
638 639 640 641 642
      goto _err;
    }
  }

  if (tsdbWriteCompIdx(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
643
    tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
644 645 646
    goto _err;
  }

H
Hongze Cheng 已提交
647
  tfree(dataDir);
H
TD-353  
Hongze Cheng 已提交
648
  tsdbCloseHelperFile(pHelper, 0);
H
TD-353  
Hongze Cheng 已提交
649 650

  pthread_rwlock_wrlock(&(pFileH->fhlock));
H
Hongze Cheng 已提交
651
#ifdef TSDB_IDX
H
Hongze Cheng 已提交
652
  pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper));
H
Hongze Cheng 已提交
653
#endif
H
Hongze Cheng 已提交
654 655 656
  pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper));
  pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper));
  pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper));
H
TD-353  
Hongze Cheng 已提交
657
  pthread_rwlock_unlock(&(pFileH->fhlock));
H
TD-353  
Hongze Cheng 已提交
658 659 660 661

  return 0;

_err:
H
Hongze Cheng 已提交
662
  tfree(dataDir);
H
TD-353  
Hongze Cheng 已提交
663 664 665 666
  tsdbCloseHelperFile(pHelper, 1);
  return -1;
}

H
Hongze Cheng 已提交
667
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
H
TD-353  
Hongze Cheng 已提交
668 669
  SMemTable *pMem = pRepo->imem;
  STsdbMeta *pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
670

H
TD-987  
Hongze Cheng 已提交
671
  SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
H
TD-353  
Hongze Cheng 已提交
672 673 674 675 676
  if (iters == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

H
TD-353  
Hongze Cheng 已提交
677 678 679
  if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;

  // reference all tables
H
TD-987  
Hongze Cheng 已提交
680
  for (int i = 0; i < pMem->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
681 682 683 684 685
    if (pMeta->tables[i] != NULL) {
      tsdbRefTable(pMeta->tables[i]);
      iters[i].pTable = pMeta->tables[i];
    }
  }
H
TD-353  
Hongze Cheng 已提交
686

H
TD-353  
Hongze Cheng 已提交
687
  if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
688

H
TD-987  
Hongze Cheng 已提交
689
  for (int i = 0; i < pMem->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
690
    if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
H
TD-353  
Hongze Cheng 已提交
691 692 693 694 695
      if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        goto _err;
      }

H
Hongze Cheng 已提交
696
      tSkipListIterNext(iters[i].pIter);
H
TD-353  
Hongze Cheng 已提交
697
    }
H
TD-353  
Hongze Cheng 已提交
698 699 700 701 702
  }

  return iters;

_err:
H
TD-987  
Hongze Cheng 已提交
703
  tsdbDestroyCommitIters(iters, pMem->maxTables);
H
TD-353  
Hongze Cheng 已提交
704 705 706
  return NULL;
}

H
Hongze Cheng 已提交
707
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
H
TD-353  
Hongze Cheng 已提交
708 709
  if (iters == NULL) return;

H
TD-353  
Hongze Cheng 已提交
710 711 712 713 714
  for (int i = 1; i < maxTables; i++) {
    if (iters[i].pTable != NULL) {
      tsdbUnRefTable(iters[i].pTable);
      tSkipListDestroyIter(iters[i].pIter);
    }
H
TD-353  
Hongze Cheng 已提交
715 716 717
  }

  free(iters);
H
TD-987  
Hongze Cheng 已提交
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
}

static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
  ASSERT(pMemTable->maxTables < maxTables);

  STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *));
  if (pTableData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  STableData **tData = pMemTable->tData;

  taosWLockLatch(&(pMemTable->latch));
  pMemTable->maxTables = maxTables;
  pMemTable->tData = pTableData;
  taosWUnLockLatch(&(pMemTable->latch));

  tfree(tData);

  return 0;
H
TD-353  
Hongze Cheng 已提交
739
}