tsdbMemTable.c 27.4 KB
Newer Older
H
TD-353  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
#include "tsdbMain.h"

#define TSDB_DATA_SKIPLIST_LEVEL 5

H
TD-353  
Hongze Cheng 已提交
21
static void        tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
H
TD-987  
Hongze Cheng 已提交
22
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
H
TD-353  
Hongze Cheng 已提交
23 24 25 26
static void        tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void        tsdbFreeTableData(STableData *pTableData);
static char *      tsdbGetTsTupleKey(const void *data);
H
TD-353  
Hongze Cheng 已提交
27
static int         tsdbCommitMeta(STsdbRepo *pRepo);
H
TD-353  
Hongze Cheng 已提交
28 29
static void        tsdbEndCommit(STsdbRepo *pRepo);
static int         tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
H
TD-987  
Hongze Cheng 已提交
30
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
H
Hongze Cheng 已提交
31 32
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void         tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
H
TD-987  
Hongze Cheng 已提交
33
static int          tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
H
TD-1548  
Hongze Cheng 已提交
34
static int          tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
H
TD-353  
Hongze Cheng 已提交
35

H
TD-353  
Hongze Cheng 已提交
36
// ---------------- INTERNAL FUNCTIONS ----------------
H
TD-1548  
Hongze Cheng 已提交
37
int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
H
TD-353  
Hongze Cheng 已提交
38
  STsdbCfg *  pCfg = &pRepo->config;
H
TD-987  
Hongze Cheng 已提交
39
  STsdbMeta * pMeta = pRepo->tsdbMeta;
H
TD-1548  
Hongze Cheng 已提交
40
  TKEY        tkey = dataRowTKey(row);
H
TD-353  
Hongze Cheng 已提交
41 42 43
  TSKEY       key = dataRowKey(row);
  SMemTable * pMemTable = pRepo->mem;
  STableData *pTableData = NULL;
H
TD-1548  
Hongze Cheng 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
  bool        isRowDelete = TKEY_IS_DELETED(tkey);

  if (isRowDelete) {
    if (!pCfg->update) {
      tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
      terrno = TSDB_CODE_TDB_INVALID_ACTION;
      return -1;
    }

    if (key > TABLE_LASTKEY(pTable)) {
      tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
                REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
      return 0;
    }
  }
H
Hongze Cheng 已提交
59 60 61

  void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
  if (pRow == NULL) {
H
TD-353  
Hongze Cheng 已提交
62
    tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
H
Hongze Cheng 已提交
63
              REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
64 65
    return -1;
  }
H
Hongze Cheng 已提交
66 67

  dataRowCpy(pRow, row);
H
TD-353  
Hongze Cheng 已提交
68

H
TD-353  
Hongze Cheng 已提交
69 70 71
  // Operations above may change pRepo->mem, retake those values
  ASSERT(pRepo->mem != NULL);
  pMemTable = pRepo->mem;
H
TD-987  
Hongze Cheng 已提交
72 73

  if (TABLE_TID(pTable) >= pMemTable->maxTables) {
H
Hongze Cheng 已提交
74 75 76 77
    if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
      tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
      return -1;
    }
H
TD-987  
Hongze Cheng 已提交
78
  }
H
TD-353  
Hongze Cheng 已提交
79 80
  pTableData = pMemTable->tData[TABLE_TID(pTable)];

H
TD-353  
Hongze Cheng 已提交
81
  if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
H
Hongze Cheng 已提交
82
    if (pTableData != NULL) {
H
TD-987  
Hongze Cheng 已提交
83
      taosWLockLatch(&(pMemTable->latch));
H
TD-353  
Hongze Cheng 已提交
84 85
      pMemTable->tData[TABLE_TID(pTable)] = NULL;
      tsdbFreeTableData(pTableData);
H
TD-987  
Hongze Cheng 已提交
86
      taosWUnLockLatch(&(pMemTable->latch));
H
TD-353  
Hongze Cheng 已提交
87
    }
H
TD-987  
Hongze Cheng 已提交
88

H
TD-353  
Hongze Cheng 已提交
89
    pTableData = tsdbNewTableData(pCfg, pTable);
H
TD-353  
Hongze Cheng 已提交
90
    if (pTableData == NULL) {
H
TD-353  
Hongze Cheng 已提交
91 92 93
      tsdbError("vgId:%d failed to insert row with key %" PRId64
                " to table %s while create new table data object since %s",
                REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
H
Hongze Cheng 已提交
94
      tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
H
TD-353  
Hongze Cheng 已提交
95 96 97 98 99 100
      return -1;
    }

    pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
  }

H
TD-353  
Hongze Cheng 已提交
101
  ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
H
TD-353  
Hongze Cheng 已提交
102

H
TD-1194  
Hongze Cheng 已提交
103 104
  int64_t oldSize = SL_SIZE(pTableData->pData);
  if (tSkipListPut(pTableData->pData, pRow) == NULL) {
H
Hongze Cheng 已提交
105
    tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
H
TD-353  
Hongze Cheng 已提交
106
  } else {
H
TD-1194  
Hongze Cheng 已提交
107
    int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize;
H
Hongze Cheng 已提交
108 109 110 111 112 113 114
    if (isRowDelete) {
      if (TABLE_LASTKEY(pTable) == key) {
        // TODO: need to update table last key here (may from file)
      }
    } else {
      if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
    }
H
TD-1548  
Hongze Cheng 已提交
115

H
TD-353  
Hongze Cheng 已提交
116 117
    if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
    if (pMemTable->keyLast < key) pMemTable->keyLast = key;
H
TD-1437  
Hongze Cheng 已提交
118
    pMemTable->numOfRows += deltaSize;
H
TD-353  
Hongze Cheng 已提交
119

H
TD-353  
Hongze Cheng 已提交
120 121
    if (pTableData->keyFirst > key) pTableData->keyFirst = key;
    if (pTableData->keyLast < key) pTableData->keyLast = key;
H
TD-1437  
Hongze Cheng 已提交
122
    pTableData->numOfRows += deltaSize;
H
TD-353  
Hongze Cheng 已提交
123
  }
H
TD-353  
Hongze Cheng 已提交
124

H
TD-1548  
Hongze Cheng 已提交
125 126 127
  tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
            isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
            key);
H
TD-353  
Hongze Cheng 已提交
128 129 130 131 132

  return 0;
}

int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
H
TD-353  
Hongze Cheng 已提交
133
  if (pMemTable == NULL) return 0;
H
Hui Li 已提交
134 135
  int ref = T_REF_INC(pMemTable);
	tsdbDebug("vgId:%d ref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
H
TD-353  
Hongze Cheng 已提交
136
  return 0;
H
TD-353  
Hongze Cheng 已提交
137 138 139 140
}

// Need to lock the repository
int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
H
TD-353  
Hongze Cheng 已提交
141
  if (pMemTable == NULL) return 0;
H
TD-353  
Hongze Cheng 已提交
142

H
Hui Li 已提交
143 144 145
	int ref = T_REF_DEC(pMemTable);
	tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
  if (ref == 0) {
H
TD-353  
Hongze Cheng 已提交
146 147 148
    STsdbBufPool *pBufPool = pRepo->pPool;

    SListNode *pNode = NULL;
H
TD-353  
Hongze Cheng 已提交
149
    if (tsdbLockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
150 151
    while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
      tdListAppendNode(pBufPool->bufBlockList, pNode);
H
TD-353  
Hongze Cheng 已提交
152 153 154 155 156 157 158
    }
    int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
    if (code != 0) {
      tsdbUnlockRepo(pRepo);
      tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
      terrno = TAOS_SYSTEM_ERROR(code);
      return -1;
H
TD-353  
Hongze Cheng 已提交
159
    }
H
TD-353  
Hongze Cheng 已提交
160
    if (tsdbUnlockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
161

H
TD-987  
Hongze Cheng 已提交
162
    for (int i = 0; i < pMemTable->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
163 164 165 166 167 168 169 170 171 172 173 174
      if (pMemTable->tData[i] != NULL) {
        tsdbFreeTableData(pMemTable->tData[i]);
      }
    }

    tdListDiscard(pMemTable->actList);
    tdListDiscard(pMemTable->bufBlockList);
    tsdbFreeMemTable(pMemTable);
  }
  return 0;
}

H
TD-353  
Hongze Cheng 已提交
175 176 177 178
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
  if (tsdbLockRepo(pRepo) < 0) return -1;

  *pMem = pRepo->mem;
H
TD-353  
Hongze Cheng 已提交
179
  *pIMem = pRepo->imem;
H
TD-353  
Hongze Cheng 已提交
180 181 182 183
  tsdbRefMemTable(pRepo, *pMem);
  tsdbRefMemTable(pRepo, *pIMem);

  if (tsdbUnlockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
184

H
TD-987  
Hongze Cheng 已提交
185 186 187
  if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));

  tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
H
TD-353  
Hongze Cheng 已提交
188
  return 0;
H
TD-353  
Hongze Cheng 已提交
189 190
}

H
TD-987  
Hongze Cheng 已提交
191 192 193 194 195 196 197 198 199
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
  if (pMem != NULL) {
    taosRUnLockLatch(&(pMem->latch));
    tsdbUnRefMemTable(pRepo, pMem);
  }

  if (pIMem != NULL) {
    tsdbUnRefMemTable(pRepo, pIMem);
  }
H
Haojun Liao 已提交
200 201

  tsdbDebug("vgId:%d utake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
H
TD-987  
Hongze Cheng 已提交
202 203
}

H
TD-353  
Hongze Cheng 已提交
204
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
H
TD-353  
Hongze Cheng 已提交
205
  STsdbCfg *     pCfg = &pRepo->config;
H
Hongze Cheng 已提交
206 207
  STsdbBufBlock *pBufBlock = NULL;
  void *         ptr = NULL;
H
TD-353  
Hongze Cheng 已提交
208

H
Hongze Cheng 已提交
209
  // Either allocate from buffer blocks or from SYSTEM memory pool
H
TD-353  
Hongze Cheng 已提交
210
  if (pRepo->mem == NULL) {
H
TD-987  
Hongze Cheng 已提交
211
    SMemTable *pMemTable = tsdbNewMemTable(pRepo);
H
TD-353  
Hongze Cheng 已提交
212
    if (pMemTable == NULL) return NULL;
H
Hongze Cheng 已提交
213 214
    pRepo->mem = pMemTable;
  }
H
TD-353  
Hongze Cheng 已提交
215

H
Hongze Cheng 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
  ASSERT(pRepo->mem != NULL);

  pBufBlock = tsdbGetCurrBufBlock(pRepo);
  if ((pRepo->mem->extraBuffList != NULL) ||
      ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < bytes))) {
    // allocate from SYSTEM buffer pool
    if (pRepo->mem->extraBuffList == NULL) {
      pRepo->mem->extraBuffList = tdListNew(0);
      if (pRepo->mem->extraBuffList == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        return NULL;
      }
    }

    ASSERT(pRepo->mem->extraBuffList != NULL);
    SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
    if (pNode == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
H
TD-353  
Hongze Cheng 已提交
234 235 236
      return NULL;
    }

H
Hongze Cheng 已提交
237
    pNode->next = pNode->prev = NULL;
H
Hongze Cheng 已提交
238
    tdListAppendNode(pRepo->mem->extraBuffList, pNode);
H
Hongze Cheng 已提交
239 240 241 242 243 244 245 246 247 248 249
    ptr = (void *)(pNode->data);
    tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes);
  } else {  // allocate from TSDB buffer pool
    if (pBufBlock == NULL || pBufBlock->remain < bytes) {
      ASSERT(listNEles(pRepo->mem->bufBlockList) < pCfg->totalBlocks / 3);
      if (tsdbLockRepo(pRepo) < 0) return NULL;
      SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
      tdListAppendNode(pRepo->mem->bufBlockList, pNode);
      if (tsdbUnlockRepo(pRepo) < 0) return NULL;
      pBufBlock = tsdbGetCurrBufBlock(pRepo);
    }
H
TD-353  
Hongze Cheng 已提交
250

H
Hongze Cheng 已提交
251 252 253 254 255 256
    ASSERT(pBufBlock->remain >= bytes);
    ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
    pBufBlock->offset += bytes;
    pBufBlock->remain -= bytes;
    tsdbTrace("vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
              listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
H
TD-353  
Hongze Cheng 已提交
257 258 259 260 261
  }

  return ptr;
}

H
TD-353  
Hongze Cheng 已提交
262 263 264 265
int tsdbAsyncCommit(STsdbRepo *pRepo) {
  SMemTable *pIMem = pRepo->imem;

  if (pRepo->mem != NULL) {
H
Hongze Cheng 已提交
266 267
    sem_wait(&(pRepo->readyToCommit));

H
Hongze Cheng 已提交
268
    if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
H
TD-353  
Hongze Cheng 已提交
269 270 271
    if (tsdbLockRepo(pRepo) < 0) return -1;
    pRepo->imem = pRepo->mem;
    pRepo->mem = NULL;
H
Hongze Cheng 已提交
272
    tsdbScheduleCommit(pRepo);
H
TD-353  
Hongze Cheng 已提交
273 274 275
    if (tsdbUnlockRepo(pRepo) < 0) return -1;
  }

H
Hongze Cheng 已提交
276
  if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
277 278 279 280

  return 0;
}

H
Hongze Cheng 已提交
281 282 283 284 285 286 287 288
int tsdbSyncCommit(TSDB_REPO_T *repo) {
  STsdbRepo *pRepo = (STsdbRepo *)repo;
  tsdbAsyncCommit(pRepo);
  sem_wait(&(pRepo->readyToCommit));
  sem_post(&(pRepo->readyToCommit));
  return 0;
}

H
TD-1548  
Hongze Cheng 已提交
289 290 291 292 293 294 295 296 297
/**
 * This is an important function to load data or try to load data from memory skiplist iterator.
 * 
 * This function load memory data until:
 * 1. iterator ends
 * 2. data key exceeds maxKey
 * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
 * 4. operations in pCols not exceeds its max capacity if pCols is given
 * 
H
Hongze Cheng 已提交
298
 * The function tries to procceed AS MUSH AS POSSIBLE.
H
TD-1548  
Hongze Cheng 已提交
299
 */
H
Hongze Cheng 已提交
300
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
H
TD-1548  
Hongze Cheng 已提交
301 302
                          TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0 && pMergeInfo != NULL);
H
Hongze Cheng 已提交
303 304
  if (pIter == NULL) return 0;
  STSchema *pSchema = NULL;
H
TD-1548  
Hongze Cheng 已提交
305 306 307
  TSKEY     rowKey = 0;
  TSKEY     fKey = 0;
  bool      isRowDel = false;
H
Hongze Cheng 已提交
308
  int       filterIter = 0;
H
TD-1548  
Hongze Cheng 已提交
309 310 311 312 313
  SDataRow  row = NULL;

  memset(pMergeInfo, 0, sizeof(*pMergeInfo));
  pMergeInfo->keyFirst = INT64_MAX;
  pMergeInfo->keyLast = INT64_MIN;
H
Hongze Cheng 已提交
314
  if (pCols) tdResetDataCols(pCols);
H
TD-1548  
Hongze Cheng 已提交
315 316 317 318

  row = tsdbNextIterRow(pIter);
  if (row == NULL || dataRowKey(row) > maxKey) {
    rowKey = INT64_MAX;
H
Hongze Cheng 已提交
319
    isRowDel = false;
H
TD-1548  
Hongze Cheng 已提交
320 321 322 323 324
  } else {
    rowKey = dataRowKey(row);
    isRowDel = dataRowDeleted(row);
  }

H
Hongze Cheng 已提交
325
  if (filterIter >= nFilterKeys) {
H
TD-1548  
Hongze Cheng 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
    fKey = INT64_MAX;
  } else {
    fKey = tdGetKey(filterKeys[filterIter]);
  }

  while (true) {
    if (fKey == INT64_MAX && rowKey == INT64_MAX) break;

    if (fKey < rowKey) {
      pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
      pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    } else if (fKey > rowKey) {
      if (isRowDel) {
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsInserted++;
        pMergeInfo->nOperations++;
        pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
        pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      }
H
Hongze Cheng 已提交
356

H
TD-1548  
Hongze Cheng 已提交
357 358 359 360
      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || dataRowKey(row) > maxKey) {
        rowKey = INT64_MAX;
H
Hongze Cheng 已提交
361
        isRowDel = false;
H
TD-1548  
Hongze Cheng 已提交
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
      } else {
        rowKey = dataRowKey(row);
        isRowDel = dataRowDeleted(row);
      }
    } else {
      if (isRowDel) {
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
          pMergeInfo->rowsUpdated++;
          pMergeInfo->nOperations++;
          pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
          tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
H
Hongze Cheng 已提交
381
        } else {
H
TD-1548  
Hongze Cheng 已提交
382 383
          pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
384 385
        }
      }
H
TD-1438  
Hongze Cheng 已提交
386

H
TD-1548  
Hongze Cheng 已提交
387 388 389 390
      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || dataRowKey(row) > maxKey) {
        rowKey = INT64_MAX;
H
Hongze Cheng 已提交
391
        isRowDel = false;
H
TD-1548  
Hongze Cheng 已提交
392 393 394 395
      } else {
        rowKey = dataRowKey(row);
        isRowDel = dataRowDeleted(row);
      }
H
Hongze Cheng 已提交
396

H
TD-1548  
Hongze Cheng 已提交
397 398 399 400 401
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
H
Hongze Cheng 已提交
402
      }
H
TD-1548  
Hongze Cheng 已提交
403 404
    }
  }
H
Hongze Cheng 已提交
405

H
TD-1548  
Hongze Cheng 已提交
406
  return 0;
H
Hongze Cheng 已提交
407 408
}

H
Hongze Cheng 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
void *tsdbCommitData(STsdbRepo *pRepo) {
  SMemTable *  pMem = pRepo->imem;
  STsdbCfg *   pCfg = &pRepo->config;
  SDataCols *  pDataCols = NULL;
  STsdbMeta *  pMeta = pRepo->tsdbMeta;
  SCommitIter *iters = NULL;
  SRWHelper    whelper = {0};
  ASSERT(pMem != NULL);

  tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
            pMem->keyFirst, pMem->keyLast, pMem->numOfRows);

  // Create the iterator to read from cache
  if (pMem->numOfRows > 0) {
    iters = tsdbCreateCommitIters(pRepo);
    if (iters == NULL) {
      tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _exit;
    }

    if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
      tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _exit;
    }

    if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
                REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
      goto _exit;
    }

    int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
    int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));

    // Loop to commit to each file
    for (int fid = sfid; fid <= efid; fid++) {
      if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
        tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
        goto _exit;
      }
    }
  }

  // Commit to update meta file
  if (tsdbCommitMeta(pRepo) < 0) {
    tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
    goto _exit;
  }

  tsdbFitRetention(pRepo);

_exit:
  tdFreeDataCols(pDataCols);
  tsdbDestroyCommitIters(iters, pMem->maxTables);
  tsdbDestroyHelper(&whelper);
  tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
466
  tsdbEndCommit(pRepo);
H
Hongze Cheng 已提交
467 468 469 470

  return NULL;
}

H
TD-353  
Hongze Cheng 已提交
471
// ---------------- LOCAL FUNCTIONS ----------------
H
TD-353  
Hongze Cheng 已提交
472
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
H
Hongze Cheng 已提交
473 474 475 476 477 478 479 480 481 482
  ASSERT(pRepo->mem != NULL);
  if (pRepo->mem->extraBuffList == NULL) {
    STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
    ASSERT(pBufBlock != NULL);
    pBufBlock->offset -= bytes;
    pBufBlock->remain += bytes;
    ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
    tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
              listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
  } else {
S
TD-1057  
Shengliang Guan 已提交
483
    SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -(int)(sizeof(SListNode)));
H
Hongze Cheng 已提交
484 485 486 487 488
    ASSERT(listTail(pRepo->mem->extraBuffList) == pNode);
    tdListPopNode(pRepo->mem->extraBuffList, pNode);
    free(pNode);
    tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
  }
H
TD-353  
Hongze Cheng 已提交
489 490
}

H
TD-987  
Hongze Cheng 已提交
491 492 493
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
  STsdbMeta *pMeta = pRepo->tsdbMeta;

H
TD-353  
Hongze Cheng 已提交
494 495 496 497 498 499 500 501 502 503
  SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
  if (pMemTable == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->keyFirst = INT64_MAX;
  pMemTable->keyLast = 0;
  pMemTable->numOfRows = 0;

H
TD-987  
Hongze Cheng 已提交
504 505
  pMemTable->maxTables = pMeta->maxTables;
  pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *));
H
TD-353  
Hongze Cheng 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
  if (pMemTable->tData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->actList = tdListNew(0);
  if (pMemTable->actList == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock*));
  if (pMemTable->bufBlockList == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  T_REF_INC(pMemTable);

  return pMemTable;

_err:
  tsdbFreeMemTable(pMemTable);
  return NULL;
}

static void tsdbFreeMemTable(SMemTable* pMemTable) {
  if (pMemTable) {
    ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
    ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));

H
Hongze Cheng 已提交
537
    tdListFree(pMemTable->extraBuffList);
H
TD-353  
Hongze Cheng 已提交
538 539
    tdListFree(pMemTable->bufBlockList);
    tdListFree(pMemTable->actList);
S
TD-1848  
Shengliang Guan 已提交
540
    tfree(pMemTable->tData);
H
TD-353  
Hongze Cheng 已提交
541 542 543 544 545 546 547 548 549 550 551
    free(pMemTable);
  }
}

static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
  STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData));
  if (pTableData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

H
TD-353  
Hongze Cheng 已提交
552
  pTableData->uid = TABLE_UID(pTable);
H
TD-353  
Hongze Cheng 已提交
553 554 555 556
  pTableData->keyFirst = INT64_MAX;
  pTableData->keyLast = 0;
  pTableData->numOfRows = 0;

H
TD-1438  
Hongze Cheng 已提交
557 558
  pTableData->pData =
      tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
H
TD-1548  
Hongze Cheng 已提交
559
                      tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
H
TD-353  
Hongze Cheng 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
  if (pTableData->pData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  return pTableData;

_err:
  tsdbFreeTableData(pTableData);
  return NULL;
}

static void tsdbFreeTableData(STableData *pTableData) {
  if (pTableData) {
    tSkipListDestroy(pTableData->pData);
    free(pTableData);
  }
H
TD-353  
Hongze Cheng 已提交
577 578
}

H
TD-1194  
Hongze Cheng 已提交
579
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
H
TD-353  
Hongze Cheng 已提交
580 581


H
TD-353  
Hongze Cheng 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
static int tsdbCommitMeta(STsdbRepo *pRepo) {
  SMemTable *pMem = pRepo->imem;
  STsdbMeta *pMeta = pRepo->tsdbMeta;
  SActObj *  pAct = NULL;
  SActCont * pCont = NULL;

  if (listNEles(pMem->actList) > 0) {
    if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
      tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _err;
    }

    SListNode *pNode = NULL;

    while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
      pAct = (SActObj *)pNode->data;
      if (pAct->act == TSDB_UPDATE_META) {
        pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
        if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
          tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
                    tstrerror(terrno));
H
Hongze Cheng 已提交
603
          tdKVStoreEndCommit(pMeta->pStore);
H
TD-353  
Hongze Cheng 已提交
604 605 606 607 608 609
          goto _err;
        }
      } else if (pAct->act == TSDB_DROP_META) {
        if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
          tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
                    tstrerror(terrno));
H
Hongze Cheng 已提交
610
          tdKVStoreEndCommit(pMeta->pStore);
H
TD-353  
Hongze Cheng 已提交
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
          goto _err;
        }
      } else {
        ASSERT(false);
      }
    }

    if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
      tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _err;
    }
  }

  return 0;

_err:
  return -1;
}

H
TD-353  
Hongze Cheng 已提交
630 631
static void tsdbEndCommit(STsdbRepo *pRepo) {
  if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
H
Hongze Cheng 已提交
632
  sem_post(&(pRepo->readyToCommit));
H
TD-353  
Hongze Cheng 已提交
633 634 635 636
}

static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
  for (int i = 0; i < nIters; i++) {
H
Hongze Cheng 已提交
637
    TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
H
TD-1548  
Hongze Cheng 已提交
638
    if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1;
H
TD-353  
Hongze Cheng 已提交
639
  }
H
TD-353  
Hongze Cheng 已提交
640 641
  return 0;
}
H
TD-353  
Hongze Cheng 已提交
642

643
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
H
TD-353  
Hongze Cheng 已提交
644 645
  *minKey = fileId * daysPerFile * tsMsPerDay[precision];
  *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
H
TD-353  
Hongze Cheng 已提交
646 647
}

H
TD-353  
Hongze Cheng 已提交
648 649
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
  char *      dataDir = NULL;
H
TD-353  
Hongze Cheng 已提交
650
  STsdbCfg *  pCfg = &pRepo->config;
H
TD-353  
Hongze Cheng 已提交
651
  STsdbFileH *pFileH = pRepo->tsdbFileH;
H
TD-353  
Hongze Cheng 已提交
652
  SFileGroup *pGroup = NULL;
H
TD-987  
Hongze Cheng 已提交
653
  SMemTable * pMem = pRepo->imem;
H
Hongze Cheng 已提交
654
  bool        newLast = false;
H
TD-353  
Hongze Cheng 已提交
655 656

  TSKEY minKey = 0, maxKey = 0;
H
TD-353  
Hongze Cheng 已提交
657
  tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
H
TD-353  
Hongze Cheng 已提交
658 659

  // Check if there are data to commit to this file
H
TD-987  
Hongze Cheng 已提交
660
  int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
H
TD-353  
Hongze Cheng 已提交
661
  if (!hasDataToCommit) {
S
Shengliang Guan 已提交
662
    tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
H
TD-353  
Hongze Cheng 已提交
663 664
    return 0;
  }
H
TD-353  
Hongze Cheng 已提交
665 666

  // Create and open files for commit
H
TD-353  
Hongze Cheng 已提交
667 668 669 670 671 672
  dataDir = tsdbGetDataDirName(pRepo->rootDir);
  if (dataDir == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

H
TD-987  
Hongze Cheng 已提交
673
  if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
H
TD-353  
Hongze Cheng 已提交
674
    tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
675 676 677 678 679
    goto _err;
  }

  // Open files for write/read
  if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
H
TD-353  
Hongze Cheng 已提交
680
    tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
681 682 683
    goto _err;
  }

H
Hongze Cheng 已提交
684 685
  newLast = TSDB_NLAST_FILE_OPENED(pHelper);

H
Hongze Cheng 已提交
686 687
  if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
    tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
688 689 690 691
    goto _err;
  }

  // Loop to commit data in each table
H
TD-987  
Hongze Cheng 已提交
692
  for (int tid = 1; tid < pMem->maxTables; tid++) {
H
TD-353  
Hongze Cheng 已提交
693 694
    SCommitIter *pIter = iters + tid;
    if (pIter->pTable == NULL) continue;
H
TD-353  
Hongze Cheng 已提交
695

H
Hongze Cheng 已提交
696 697
    taosRLockLatch(&(pIter->pTable->latch));

H
Hongze Cheng 已提交
698
    if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
699

H
TD-353  
Hongze Cheng 已提交
700
    if (pIter->pIter != NULL) {
H
Hongze Cheng 已提交
701 702 703 704
      if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        goto _err;
      }
H
TD-353  
Hongze Cheng 已提交
705

H
Hongze Cheng 已提交
706 707 708 709 710 711
      if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
        taosRUnLockLatch(&(pIter->pTable->latch));
        tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
                  TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
                  tstrerror(terrno));
        goto _err;
H
TD-353  
Hongze Cheng 已提交
712 713
      }
    }
H
TD-353  
Hongze Cheng 已提交
714

H
Hongze Cheng 已提交
715 716
    taosRUnLockLatch(&(pIter->pTable->latch));

H
TD-353  
Hongze Cheng 已提交
717 718
    // Move the last block to the new .l file if neccessary
    if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
719
      tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
720 721 722 723 724
      goto _err;
    }

    // Write the SCompBlock part
    if (tsdbWriteCompInfo(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
725
      tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
726 727 728 729 730
      goto _err;
    }
  }

  if (tsdbWriteCompIdx(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
731
    tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
732 733 734
    goto _err;
  }

S
TD-1848  
Shengliang Guan 已提交
735
  tfree(dataDir);
H
Hongze Cheng 已提交
736
  tsdbCloseHelperFile(pHelper, 0, pGroup);
H
TD-353  
Hongze Cheng 已提交
737 738

  pthread_rwlock_wrlock(&(pFileH->fhlock));
H
Hongze Cheng 已提交
739

H
Hongze Cheng 已提交
740
  (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
H
Hongze Cheng 已提交
741 742
  pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;

H
Hongze Cheng 已提交
743
  if (newLast) {
H
Hongze Cheng 已提交
744
    (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
H
Hongze Cheng 已提交
745 746 747 748
    pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
  } else {
    pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
  }
H
Hongze Cheng 已提交
749 750 751

  pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;

H
TD-353  
Hongze Cheng 已提交
752
  pthread_rwlock_unlock(&(pFileH->fhlock));
H
TD-353  
Hongze Cheng 已提交
753 754 755 756

  return 0;

_err:
S
TD-1848  
Shengliang Guan 已提交
757
  tfree(dataDir);
H
Hongze Cheng 已提交
758
  tsdbCloseHelperFile(pHelper, 1, NULL);
H
TD-353  
Hongze Cheng 已提交
759 760 761
  return -1;
}

H
Hongze Cheng 已提交
762
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
H
TD-353  
Hongze Cheng 已提交
763 764
  SMemTable *pMem = pRepo->imem;
  STsdbMeta *pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
765

H
TD-987  
Hongze Cheng 已提交
766
  SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
H
TD-353  
Hongze Cheng 已提交
767 768 769 770 771
  if (iters == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

H
TD-353  
Hongze Cheng 已提交
772 773 774
  if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;

  // reference all tables
H
TD-987  
Hongze Cheng 已提交
775
  for (int i = 0; i < pMem->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
776 777 778 779 780
    if (pMeta->tables[i] != NULL) {
      tsdbRefTable(pMeta->tables[i]);
      iters[i].pTable = pMeta->tables[i];
    }
  }
H
TD-353  
Hongze Cheng 已提交
781

H
TD-353  
Hongze Cheng 已提交
782
  if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
783

H
TD-987  
Hongze Cheng 已提交
784
  for (int i = 0; i < pMem->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
785
    if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
H
TD-353  
Hongze Cheng 已提交
786 787 788 789 790
      if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        goto _err;
      }

H
Hongze Cheng 已提交
791
      tSkipListIterNext(iters[i].pIter);
H
TD-353  
Hongze Cheng 已提交
792
    }
H
TD-353  
Hongze Cheng 已提交
793 794 795 796 797
  }

  return iters;

_err:
H
TD-987  
Hongze Cheng 已提交
798
  tsdbDestroyCommitIters(iters, pMem->maxTables);
H
TD-353  
Hongze Cheng 已提交
799 800 801
  return NULL;
}

H
Hongze Cheng 已提交
802
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
H
TD-353  
Hongze Cheng 已提交
803 804
  if (iters == NULL) return;

H
TD-353  
Hongze Cheng 已提交
805 806 807 808 809
  for (int i = 1; i < maxTables; i++) {
    if (iters[i].pTable != NULL) {
      tsdbUnRefTable(iters[i].pTable);
      tSkipListDestroyIter(iters[i].pIter);
    }
H
TD-353  
Hongze Cheng 已提交
810 811 812
  }

  free(iters);
H
TD-987  
Hongze Cheng 已提交
813 814 815 816 817 818 819 820 821 822
}

static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
  ASSERT(pMemTable->maxTables < maxTables);

  STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *));
  if (pTableData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
fix bug  
Hongze Cheng 已提交
823
  memcpy((void *)pTableData, (void *)pMemTable->tData, sizeof(STableData *) * pMemTable->maxTables);
H
TD-987  
Hongze Cheng 已提交
824 825 826 827 828 829 830 831

  STableData **tData = pMemTable->tData;

  taosWLockLatch(&(pMemTable->latch));
  pMemTable->maxTables = maxTables;
  pMemTable->tData = pTableData;
  taosWUnLockLatch(&(pMemTable->latch));

S
TD-1848  
Shengliang Guan 已提交
832
  tfree(tData);
H
TD-987  
Hongze Cheng 已提交
833

H
TD-1548  
Hongze Cheng 已提交
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
  return 0;
}

static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row) {
  if (pCols) {
    if (*ppSchema == NULL || schemaVersion(*ppSchema) != dataRowVersion(row)) {
      *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
      if (*ppSchema == NULL) {
        ASSERT(false);
        return -1;
      }
    }

    tdAppendDataRowToDataCol(row, *ppSchema, pCols);
  }

H
TD-987  
Hongze Cheng 已提交
850
  return 0;
H
TD-353  
Hongze Cheng 已提交
851
}