tsdbMemTable.c 25.1 KB
Newer Older
H
TD-353  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
#include "tsdbMain.h"

#define TSDB_DATA_SKIPLIST_LEVEL 5

H
TD-353  
Hongze Cheng 已提交
21
static void        tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
H
TD-987  
Hongze Cheng 已提交
22
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
H
TD-353  
Hongze Cheng 已提交
23 24 25 26
static void        tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void        tsdbFreeTableData(STableData *pTableData);
static char *      tsdbGetTsTupleKey(const void *data);
H
TD-353  
Hongze Cheng 已提交
27
static void *      tsdbCommitData(void *arg);
H
TD-353  
Hongze Cheng 已提交
28
static int         tsdbCommitMeta(STsdbRepo *pRepo);
H
TD-353  
Hongze Cheng 已提交
29 30
static void        tsdbEndCommit(STsdbRepo *pRepo);
static int         tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
H
TD-987  
Hongze Cheng 已提交
31
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
H
Hongze Cheng 已提交
32 33
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void         tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
H
TD-987  
Hongze Cheng 已提交
34
static int          tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
H
TD-353  
Hongze Cheng 已提交
35

H
TD-353  
Hongze Cheng 已提交
36 37
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
H
TD-353  
Hongze Cheng 已提交
38
  STsdbCfg *  pCfg = &pRepo->config;
H
TD-987  
Hongze Cheng 已提交
39
  STsdbMeta * pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
40 41 42 43 44
  int32_t     level = 0;
  int32_t     headSize = 0;
  TSKEY       key = dataRowKey(row);
  SMemTable * pMemTable = pRepo->mem;
  STableData *pTableData = NULL;
H
TD-353  
Hongze Cheng 已提交
45
  SSkipList * pSList = NULL;
H
TD-353  
Hongze Cheng 已提交
46

H
TD-987  
Hongze Cheng 已提交
47
  if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
H
TD-353  
Hongze Cheng 已提交
48
      pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
H
TD-353  
Hongze Cheng 已提交
49
    pTableData = pMemTable->tData[TABLE_TID(pTable)];
H
TD-353  
Hongze Cheng 已提交
50
    pSList = pTableData->pData;
H
TD-353  
Hongze Cheng 已提交
51
  }
H
TD-353  
Hongze Cheng 已提交
52

H
TD-353  
Hongze Cheng 已提交
53
  tSkipListNewNodeInfo(pSList, &level, &headSize);
H
TD-353  
Hongze Cheng 已提交
54

H
Hongze Cheng 已提交
55
  SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *));
H
TD-353  
Hongze Cheng 已提交
56
  if (pNode == NULL) {
H
Hongze Cheng 已提交
57 58 59 60 61 62
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
  if (pRow == NULL) {
H
TD-353  
Hongze Cheng 已提交
63
    tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
H
Hongze Cheng 已提交
64 65
              REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
    free(pNode);
H
TD-353  
Hongze Cheng 已提交
66 67
    return -1;
  }
H
Hongze Cheng 已提交
68

H
TD-353  
Hongze Cheng 已提交
69
  pNode->level = level;
H
Hongze Cheng 已提交
70 71
  dataRowCpy(pRow, row);
  *(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow;
H
TD-353  
Hongze Cheng 已提交
72

H
TD-353  
Hongze Cheng 已提交
73 74 75
  // Operations above may change pRepo->mem, retake those values
  ASSERT(pRepo->mem != NULL);
  pMemTable = pRepo->mem;
H
TD-987  
Hongze Cheng 已提交
76 77

  if (TABLE_TID(pTable) >= pMemTable->maxTables) {
H
Hongze Cheng 已提交
78 79 80 81 82
    if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
      tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
      free(pNode);
      return -1;
    }
H
TD-987  
Hongze Cheng 已提交
83
  }
H
TD-353  
Hongze Cheng 已提交
84 85
  pTableData = pMemTable->tData[TABLE_TID(pTable)];

H
TD-353  
Hongze Cheng 已提交
86
  if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
H
Hongze Cheng 已提交
87
    if (pTableData != NULL) {
H
TD-987  
Hongze Cheng 已提交
88
      taosWLockLatch(&(pMemTable->latch));
H
TD-353  
Hongze Cheng 已提交
89 90
      pMemTable->tData[TABLE_TID(pTable)] = NULL;
      tsdbFreeTableData(pTableData);
H
TD-987  
Hongze Cheng 已提交
91
      taosWUnLockLatch(&(pMemTable->latch));
H
TD-353  
Hongze Cheng 已提交
92
    }
H
TD-987  
Hongze Cheng 已提交
93

H
TD-353  
Hongze Cheng 已提交
94
    pTableData = tsdbNewTableData(pCfg, pTable);
H
TD-353  
Hongze Cheng 已提交
95
    if (pTableData == NULL) {
H
TD-353  
Hongze Cheng 已提交
96 97 98
      tsdbError("vgId:%d failed to insert row with key %" PRId64
                " to table %s while create new table data object since %s",
                REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
H
Hongze Cheng 已提交
99 100
      tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
      free(pNode);
H
TD-353  
Hongze Cheng 已提交
101 102 103 104 105 106
      return -1;
    }

    pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
  }

H
TD-353  
Hongze Cheng 已提交
107
  ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
H
TD-353  
Hongze Cheng 已提交
108 109

  if (tSkipListPut(pTableData->pData, pNode) == NULL) {
H
Hongze Cheng 已提交
110 111
    tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
    free(pNode);
H
TD-353  
Hongze Cheng 已提交
112
  } else {
113
    if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
H
TD-353  
Hongze Cheng 已提交
114 115 116
    if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
    if (pMemTable->keyLast < key) pMemTable->keyLast = key;
    pMemTable->numOfRows++;
H
TD-353  
Hongze Cheng 已提交
117

H
TD-353  
Hongze Cheng 已提交
118 119 120
    if (pTableData->keyFirst > key) pTableData->keyFirst = key;
    if (pTableData->keyLast < key) pTableData->keyLast = key;
    pTableData->numOfRows++;
H
TD-353  
Hongze Cheng 已提交
121

H
TD-353  
Hongze Cheng 已提交
122 123
    ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
  }
H
TD-353  
Hongze Cheng 已提交
124

125
  tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
H
TD-353  
Hongze Cheng 已提交
126
            TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key);
H
TD-353  
Hongze Cheng 已提交
127 128 129 130 131

  return 0;
}

int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
H
TD-353  
Hongze Cheng 已提交
132
  if (pMemTable == NULL) return 0;
H
Hui Li 已提交
133 134
  int ref = T_REF_INC(pMemTable);
	tsdbDebug("vgId:%d ref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
H
TD-353  
Hongze Cheng 已提交
135
  return 0;
H
TD-353  
Hongze Cheng 已提交
136 137 138 139
}

// Need to lock the repository
int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
H
TD-353  
Hongze Cheng 已提交
140
  if (pMemTable == NULL) return 0;
H
TD-353  
Hongze Cheng 已提交
141

H
Hui Li 已提交
142 143 144
	int ref = T_REF_DEC(pMemTable);
	tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
  if (ref == 0) {
H
TD-353  
Hongze Cheng 已提交
145 146 147
    STsdbBufPool *pBufPool = pRepo->pPool;

    SListNode *pNode = NULL;
H
TD-353  
Hongze Cheng 已提交
148
    if (tsdbLockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
149 150
    while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
      tdListAppendNode(pBufPool->bufBlockList, pNode);
H
TD-353  
Hongze Cheng 已提交
151 152 153 154 155 156 157
    }
    int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
    if (code != 0) {
      tsdbUnlockRepo(pRepo);
      tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
      terrno = TAOS_SYSTEM_ERROR(code);
      return -1;
H
TD-353  
Hongze Cheng 已提交
158
    }
H
TD-353  
Hongze Cheng 已提交
159
    if (tsdbUnlockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
160

H
TD-987  
Hongze Cheng 已提交
161
    for (int i = 0; i < pMemTable->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
162 163 164 165 166 167 168 169 170 171 172 173
      if (pMemTable->tData[i] != NULL) {
        tsdbFreeTableData(pMemTable->tData[i]);
      }
    }

    tdListDiscard(pMemTable->actList);
    tdListDiscard(pMemTable->bufBlockList);
    tsdbFreeMemTable(pMemTable);
  }
  return 0;
}

H
TD-353  
Hongze Cheng 已提交
174 175 176 177
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
  if (tsdbLockRepo(pRepo) < 0) return -1;

  *pMem = pRepo->mem;
H
TD-353  
Hongze Cheng 已提交
178
  *pIMem = pRepo->imem;
H
TD-353  
Hongze Cheng 已提交
179 180 181 182
  tsdbRefMemTable(pRepo, *pMem);
  tsdbRefMemTable(pRepo, *pIMem);

  if (tsdbUnlockRepo(pRepo) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
183

H
TD-987  
Hongze Cheng 已提交
184 185 186
  if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));

  tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
H
TD-353  
Hongze Cheng 已提交
187
  return 0;
H
TD-353  
Hongze Cheng 已提交
188 189
}

H
TD-987  
Hongze Cheng 已提交
190 191 192 193 194 195 196 197 198 199 200
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
  if (pMem != NULL) {
    taosRUnLockLatch(&(pMem->latch));
    tsdbUnRefMemTable(pRepo, pMem);
  }

  if (pIMem != NULL) {
    tsdbUnRefMemTable(pRepo, pIMem);
  }
}

H
TD-353  
Hongze Cheng 已提交
201
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
H
TD-353  
Hongze Cheng 已提交
202
  STsdbCfg *     pCfg = &pRepo->config;
H
Hongze Cheng 已提交
203 204
  STsdbBufBlock *pBufBlock = NULL;
  void *         ptr = NULL;
H
TD-353  
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206
  // Either allocate from buffer blocks or from SYSTEM memory pool
H
TD-353  
Hongze Cheng 已提交
207
  if (pRepo->mem == NULL) {
H
TD-987  
Hongze Cheng 已提交
208
    SMemTable *pMemTable = tsdbNewMemTable(pRepo);
H
TD-353  
Hongze Cheng 已提交
209
    if (pMemTable == NULL) return NULL;
H
Hongze Cheng 已提交
210 211
    pRepo->mem = pMemTable;
  }
H
TD-353  
Hongze Cheng 已提交
212

H
Hongze Cheng 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  ASSERT(pRepo->mem != NULL);

  pBufBlock = tsdbGetCurrBufBlock(pRepo);
  if ((pRepo->mem->extraBuffList != NULL) ||
      ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < bytes))) {
    // allocate from SYSTEM buffer pool
    if (pRepo->mem->extraBuffList == NULL) {
      pRepo->mem->extraBuffList = tdListNew(0);
      if (pRepo->mem->extraBuffList == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        return NULL;
      }
    }

    ASSERT(pRepo->mem->extraBuffList != NULL);
    SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
    if (pNode == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
H
TD-353  
Hongze Cheng 已提交
231 232 233
      return NULL;
    }

H
Hongze Cheng 已提交
234
    pNode->next = pNode->prev = NULL;
H
Hongze Cheng 已提交
235
    tdListAppendNode(pRepo->mem->extraBuffList, pNode);
H
Hongze Cheng 已提交
236 237 238 239 240 241 242 243 244 245 246
    ptr = (void *)(pNode->data);
    tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes);
  } else {  // allocate from TSDB buffer pool
    if (pBufBlock == NULL || pBufBlock->remain < bytes) {
      ASSERT(listNEles(pRepo->mem->bufBlockList) < pCfg->totalBlocks / 3);
      if (tsdbLockRepo(pRepo) < 0) return NULL;
      SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
      tdListAppendNode(pRepo->mem->bufBlockList, pNode);
      if (tsdbUnlockRepo(pRepo) < 0) return NULL;
      pBufBlock = tsdbGetCurrBufBlock(pRepo);
    }
H
TD-353  
Hongze Cheng 已提交
247

H
Hongze Cheng 已提交
248 249 250 251 252 253
    ASSERT(pBufBlock->remain >= bytes);
    ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
    pBufBlock->offset += bytes;
    pBufBlock->remain -= bytes;
    tsdbTrace("vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
              listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
H
TD-353  
Hongze Cheng 已提交
254 255 256 257 258
  }

  return ptr;
}

H
TD-353  
Hongze Cheng 已提交
259 260 261 262 263 264 265 266 267 268 269 270
int tsdbAsyncCommit(STsdbRepo *pRepo) {
  SMemTable *pIMem = pRepo->imem;
  int        code = 0;

  if (pIMem != NULL) {
    ASSERT(pRepo->commit);
    code = pthread_join(pRepo->commitThread, NULL);
    if (code != 0) {
      tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
H
Hongze Cheng 已提交
271
    pRepo->commit = 0;
H
TD-353  
Hongze Cheng 已提交
272 273 274 275
  }

  ASSERT(pRepo->commit == 0);
  if (pRepo->mem != NULL) {
H
Hongze Cheng 已提交
276
    if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
H
TD-353  
Hongze Cheng 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
    if (tsdbLockRepo(pRepo) < 0) return -1;
    pRepo->imem = pRepo->mem;
    pRepo->mem = NULL;
    pRepo->commit = 1;
    code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
    if (code != 0) {
      tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(code);
      tsdbUnlockRepo(pRepo);
      return -1;
    }
    if (tsdbUnlockRepo(pRepo) < 0) return -1;
  }

  if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;

  return 0;
}

H
Hongze Cheng 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
                          TSKEY *filterKeys, int nFilterKeys) {
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
  STSchema *pSchema = NULL;
  int       numOfRows = 0;
  TSKEY     keyNext = 0;
  int       filterIter = 0;

  if (nFilterKeys != 0) { // for filter purpose
    ASSERT(filterKeys != NULL);
    keyNext = tsdbNextIterKey(pIter);
    if (keyNext < 0 || keyNext > maxKey) return numOfRows;
    void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE);
S
TD-1057  
Shengliang Guan 已提交
310
    filterIter = (ptr == NULL) ? nFilterKeys : (int)((POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY)));
H
Hongze Cheng 已提交
311 312 313 314 315 316 317
  }

  do {
    SDataRow row = tsdbNextIterRow(pIter);
    if (row == NULL) break;

    keyNext = dataRowKey(row);
H
TD-982  
Hongze Cheng 已提交
318
    if (keyNext > maxKey) break;
H
Hongze Cheng 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336

    bool keyFiltered = false;
    if (nFilterKeys != 0) {
      while (true) {
        if (filterIter >= nFilterKeys) break;
        if (keyNext == filterKeys[filterIter]) {
          keyFiltered = true;
          filterIter++;
          break;
        } else if (keyNext < filterKeys[filterIter]) {
          break;
        } else {
          filterIter++;
        }
      }
    }

    if (!keyFiltered) {
H
TD-982  
Hongze Cheng 已提交
337
      if (numOfRows >= maxRowsToRead) break;
H
Hongze Cheng 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
      if (pCols) {
        if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
          pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
          if (pSchema == NULL) {
            ASSERT(0);
          }
        }

        tdAppendDataRowToDataCol(row, pSchema, pCols);
      }
      numOfRows++;
    }
  } while (tSkipListIterNext(pIter));

  return numOfRows;
}

H
TD-353  
Hongze Cheng 已提交
355
// ---------------- LOCAL FUNCTIONS ----------------
H
TD-353  
Hongze Cheng 已提交
356
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
H
Hongze Cheng 已提交
357 358 359 360 361 362 363 364 365 366
  ASSERT(pRepo->mem != NULL);
  if (pRepo->mem->extraBuffList == NULL) {
    STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
    ASSERT(pBufBlock != NULL);
    pBufBlock->offset -= bytes;
    pBufBlock->remain += bytes;
    ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
    tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
              listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
  } else {
S
TD-1057  
Shengliang Guan 已提交
367
    SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -(int)(sizeof(SListNode)));
H
Hongze Cheng 已提交
368 369 370 371 372
    ASSERT(listTail(pRepo->mem->extraBuffList) == pNode);
    tdListPopNode(pRepo->mem->extraBuffList, pNode);
    free(pNode);
    tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
  }
H
TD-353  
Hongze Cheng 已提交
373 374
}

H
TD-987  
Hongze Cheng 已提交
375 376 377
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
  STsdbMeta *pMeta = pRepo->tsdbMeta;

H
TD-353  
Hongze Cheng 已提交
378 379 380 381 382 383 384 385 386 387
  SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
  if (pMemTable == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->keyFirst = INT64_MAX;
  pMemTable->keyLast = 0;
  pMemTable->numOfRows = 0;

H
TD-987  
Hongze Cheng 已提交
388 389
  pMemTable->maxTables = pMeta->maxTables;
  pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *));
H
TD-353  
Hongze Cheng 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
  if (pMemTable->tData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->actList = tdListNew(0);
  if (pMemTable->actList == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock*));
  if (pMemTable->bufBlockList == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  T_REF_INC(pMemTable);

  return pMemTable;

_err:
  tsdbFreeMemTable(pMemTable);
  return NULL;
}

static void tsdbFreeMemTable(SMemTable* pMemTable) {
  if (pMemTable) {
    ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
    ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));

H
Hongze Cheng 已提交
421
    tdListFree(pMemTable->extraBuffList);
H
TD-353  
Hongze Cheng 已提交
422 423
    tdListFree(pMemTable->bufBlockList);
    tdListFree(pMemTable->actList);
S
Shengliang Guan 已提交
424
    taosTFree(pMemTable->tData);
H
TD-353  
Hongze Cheng 已提交
425 426 427 428 429 430 431 432 433 434 435
    free(pMemTable);
  }
}

static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
  STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData));
  if (pTableData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

H
TD-353  
Hongze Cheng 已提交
436
  pTableData->uid = TABLE_UID(pTable);
H
TD-353  
Hongze Cheng 已提交
437 438 439 440
  pTableData->keyFirst = INT64_MAX;
  pTableData->keyLast = 0;
  pTableData->numOfRows = 0;

H
TD-353  
Hongze Cheng 已提交
441
  pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
H
Hongze Cheng 已提交
442
                                      TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey);
H
TD-353  
Hongze Cheng 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
  if (pTableData->pData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }

  return pTableData;

_err:
  tsdbFreeTableData(pTableData);
  return NULL;
}

static void tsdbFreeTableData(STableData *pTableData) {
  if (pTableData) {
    tSkipListDestroy(pTableData->pData);
    free(pTableData);
  }
H
TD-353  
Hongze Cheng 已提交
460 461
}

H
Hongze Cheng 已提交
462
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); }
H
TD-353  
Hongze Cheng 已提交
463 464

static void *tsdbCommitData(void *arg) {
H
TD-353  
Hongze Cheng 已提交
465 466 467 468
  STsdbRepo *  pRepo = (STsdbRepo *)arg;
  SMemTable *  pMem = pRepo->imem;
  STsdbCfg *   pCfg = &pRepo->config;
  SDataCols *  pDataCols = NULL;
H
TD-353  
Hongze Cheng 已提交
469
  STsdbMeta *  pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
470
  SCommitIter *iters = NULL;
H
TD-353  
Hongze Cheng 已提交
471
  SRWHelper    whelper = {0};
H
TD-353  
Hongze Cheng 已提交
472
  ASSERT(pRepo->commit == 1);
H
TD-353  
Hongze Cheng 已提交
473
  ASSERT(pMem != NULL);
H
TD-353  
Hongze Cheng 已提交
474

S
Shengliang Guan 已提交
475
  tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
H
TD-353  
Hongze Cheng 已提交
476
            pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
H
TD-353  
Hongze Cheng 已提交
477 478

  // Create the iterator to read from cache
H
TD-353  
Hongze Cheng 已提交
479
  if (pMem->numOfRows > 0) {
H
Hongze Cheng 已提交
480
    iters = tsdbCreateCommitIters(pRepo);
H
TD-353  
Hongze Cheng 已提交
481 482 483 484
    if (iters == NULL) {
      tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _exit;
    }
H
TD-353  
Hongze Cheng 已提交
485

H
TD-353  
Hongze Cheng 已提交
486 487 488 489
    if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
      tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _exit;
    }
H
TD-353  
Hongze Cheng 已提交
490

H
TD-353  
Hongze Cheng 已提交
491 492 493 494 495 496
    if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
                REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
      goto _exit;
    }
H
TD-353  
Hongze Cheng 已提交
497

S
TD-1057  
Shengliang Guan 已提交
498 499
    int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
    int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
H
TD-353  
Hongze Cheng 已提交
500

H
TD-353  
Hongze Cheng 已提交
501 502 503 504 505 506
    // Loop to commit to each file
    for (int fid = sfid; fid <= efid; fid++) {
      if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
        tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
        goto _exit;
      }
H
TD-353  
Hongze Cheng 已提交
507 508 509
    }
  }

H
TD-353  
Hongze Cheng 已提交
510 511 512 513 514
  // Commit to update meta file
  if (tsdbCommitMeta(pRepo) < 0) {
    tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
    goto _exit;
  }
H
TD-353  
Hongze Cheng 已提交
515

H
TD-353  
Hongze Cheng 已提交
516
  tsdbFitRetention(pRepo);
H
TD-353  
Hongze Cheng 已提交
517 518 519

_exit:
  tdFreeDataCols(pDataCols);
H
TD-987  
Hongze Cheng 已提交
520
  tsdbDestroyCommitIters(iters, pMem->maxTables);
H
TD-353  
Hongze Cheng 已提交
521
  tsdbDestroyHelper(&whelper);
H
TD-353  
Hongze Cheng 已提交
522
  tsdbEndCommit(pRepo);
S
Shengliang Guan 已提交
523
  tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
H
TD-353  
Hongze Cheng 已提交
524 525 526

  return NULL;
}
H
TD-353  
Hongze Cheng 已提交
527

H
TD-353  
Hongze Cheng 已提交
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
static int tsdbCommitMeta(STsdbRepo *pRepo) {
  SMemTable *pMem = pRepo->imem;
  STsdbMeta *pMeta = pRepo->tsdbMeta;
  SActObj *  pAct = NULL;
  SActCont * pCont = NULL;

  if (listNEles(pMem->actList) > 0) {
    if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
      tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _err;
    }

    SListNode *pNode = NULL;

    while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
      pAct = (SActObj *)pNode->data;
      if (pAct->act == TSDB_UPDATE_META) {
        pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
        if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
          tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
                    tstrerror(terrno));
H
Hongze Cheng 已提交
549
          tdKVStoreEndCommit(pMeta->pStore);
H
TD-353  
Hongze Cheng 已提交
550 551 552 553 554 555
          goto _err;
        }
      } else if (pAct->act == TSDB_DROP_META) {
        if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
          tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
                    tstrerror(terrno));
H
Hongze Cheng 已提交
556
          tdKVStoreEndCommit(pMeta->pStore);
H
TD-353  
Hongze Cheng 已提交
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
          goto _err;
        }
      } else {
        ASSERT(false);
      }
    }

    if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
      tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
      goto _err;
    }
  }

  return 0;

_err:
  return -1;
}

H
TD-353  
Hongze Cheng 已提交
576 577 578 579 580 581 582
static void tsdbEndCommit(STsdbRepo *pRepo) {
  ASSERT(pRepo->commit == 1);
  if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
}

static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
  for (int i = 0; i < nIters; i++) {
H
Hongze Cheng 已提交
583
    TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
H
TD-353  
Hongze Cheng 已提交
584
    if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
H
TD-353  
Hongze Cheng 已提交
585
  }
H
TD-353  
Hongze Cheng 已提交
586 587
  return 0;
}
H
TD-353  
Hongze Cheng 已提交
588

589
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
H
TD-353  
Hongze Cheng 已提交
590 591
  *minKey = fileId * daysPerFile * tsMsPerDay[precision];
  *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
H
TD-353  
Hongze Cheng 已提交
592 593
}

H
TD-353  
Hongze Cheng 已提交
594 595
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
  char *      dataDir = NULL;
H
TD-353  
Hongze Cheng 已提交
596
  STsdbCfg *  pCfg = &pRepo->config;
H
TD-353  
Hongze Cheng 已提交
597
  STsdbFileH *pFileH = pRepo->tsdbFileH;
H
TD-353  
Hongze Cheng 已提交
598
  SFileGroup *pGroup = NULL;
H
TD-987  
Hongze Cheng 已提交
599
  SMemTable * pMem = pRepo->imem;
H
Hongze Cheng 已提交
600
  bool        newLast = false;
H
TD-353  
Hongze Cheng 已提交
601 602

  TSKEY minKey = 0, maxKey = 0;
H
TD-353  
Hongze Cheng 已提交
603
  tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
H
TD-353  
Hongze Cheng 已提交
604 605

  // Check if there are data to commit to this file
H
TD-987  
Hongze Cheng 已提交
606
  int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
H
TD-353  
Hongze Cheng 已提交
607
  if (!hasDataToCommit) {
S
Shengliang Guan 已提交
608
    tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
H
TD-353  
Hongze Cheng 已提交
609 610
    return 0;
  }
H
TD-353  
Hongze Cheng 已提交
611 612

  // Create and open files for commit
H
TD-353  
Hongze Cheng 已提交
613 614 615 616 617 618
  dataDir = tsdbGetDataDirName(pRepo->rootDir);
  if (dataDir == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

H
TD-987  
Hongze Cheng 已提交
619
  if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
H
TD-353  
Hongze Cheng 已提交
620
    tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
621 622 623 624 625
    goto _err;
  }

  // Open files for write/read
  if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
H
TD-353  
Hongze Cheng 已提交
626
    tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
627 628 629
    goto _err;
  }

H
Hongze Cheng 已提交
630 631
  newLast = TSDB_NLAST_FILE_OPENED(pHelper);

H
Hongze Cheng 已提交
632 633
  if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
    tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
634 635 636 637
    goto _err;
  }

  // Loop to commit data in each table
H
TD-987  
Hongze Cheng 已提交
638
  for (int tid = 1; tid < pMem->maxTables; tid++) {
H
TD-353  
Hongze Cheng 已提交
639 640
    SCommitIter *pIter = iters + tid;
    if (pIter->pTable == NULL) continue;
H
TD-353  
Hongze Cheng 已提交
641

H
Hongze Cheng 已提交
642 643
    taosRLockLatch(&(pIter->pTable->latch));

H
Hongze Cheng 已提交
644
    if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
645

H
TD-353  
Hongze Cheng 已提交
646
    if (pIter->pIter != NULL) {
H
Hongze Cheng 已提交
647 648 649 650
      if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        goto _err;
      }
H
TD-353  
Hongze Cheng 已提交
651

H
Hongze Cheng 已提交
652 653 654 655 656 657
      if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
        taosRUnLockLatch(&(pIter->pTable->latch));
        tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
                  TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
                  tstrerror(terrno));
        goto _err;
H
TD-353  
Hongze Cheng 已提交
658 659
      }
    }
H
TD-353  
Hongze Cheng 已提交
660

H
Hongze Cheng 已提交
661 662
    taosRUnLockLatch(&(pIter->pTable->latch));

H
TD-353  
Hongze Cheng 已提交
663 664
    // Move the last block to the new .l file if neccessary
    if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
665
      tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
666 667 668 669 670
      goto _err;
    }

    // Write the SCompBlock part
    if (tsdbWriteCompInfo(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
671
      tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
672 673 674 675 676
      goto _err;
    }
  }

  if (tsdbWriteCompIdx(pHelper) < 0) {
H
TD-353  
Hongze Cheng 已提交
677
    tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
TD-353  
Hongze Cheng 已提交
678 679 680
    goto _err;
  }

S
Shengliang Guan 已提交
681
  taosTFree(dataDir);
H
Hongze Cheng 已提交
682
  tsdbCloseHelperFile(pHelper, 0, pGroup);
H
TD-353  
Hongze Cheng 已提交
683 684

  pthread_rwlock_wrlock(&(pFileH->fhlock));
H
Hongze Cheng 已提交
685

H
Hongze Cheng 已提交
686
  (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
H
Hongze Cheng 已提交
687 688
  pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;

H
Hongze Cheng 已提交
689
  if (newLast) {
H
Hongze Cheng 已提交
690
    (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
H
Hongze Cheng 已提交
691 692 693 694
    pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
  } else {
    pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
  }
H
Hongze Cheng 已提交
695 696 697

  pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;

H
TD-353  
Hongze Cheng 已提交
698
  pthread_rwlock_unlock(&(pFileH->fhlock));
H
TD-353  
Hongze Cheng 已提交
699 700 701 702

  return 0;

_err:
S
Shengliang Guan 已提交
703
  taosTFree(dataDir);
H
Hongze Cheng 已提交
704
  tsdbCloseHelperFile(pHelper, 1, NULL);
H
TD-353  
Hongze Cheng 已提交
705 706 707
  return -1;
}

H
Hongze Cheng 已提交
708
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
H
TD-353  
Hongze Cheng 已提交
709 710
  SMemTable *pMem = pRepo->imem;
  STsdbMeta *pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
711

H
TD-987  
Hongze Cheng 已提交
712
  SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
H
TD-353  
Hongze Cheng 已提交
713 714 715 716 717
  if (iters == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

H
TD-353  
Hongze Cheng 已提交
718 719 720
  if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;

  // reference all tables
H
TD-987  
Hongze Cheng 已提交
721
  for (int i = 0; i < pMem->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
722 723 724 725 726
    if (pMeta->tables[i] != NULL) {
      tsdbRefTable(pMeta->tables[i]);
      iters[i].pTable = pMeta->tables[i];
    }
  }
H
TD-353  
Hongze Cheng 已提交
727

H
TD-353  
Hongze Cheng 已提交
728
  if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
729

H
TD-987  
Hongze Cheng 已提交
730
  for (int i = 0; i < pMem->maxTables; i++) {
H
TD-353  
Hongze Cheng 已提交
731
    if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
H
TD-353  
Hongze Cheng 已提交
732 733 734 735 736
      if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        goto _err;
      }

H
Hongze Cheng 已提交
737
      tSkipListIterNext(iters[i].pIter);
H
TD-353  
Hongze Cheng 已提交
738
    }
H
TD-353  
Hongze Cheng 已提交
739 740 741 742 743
  }

  return iters;

_err:
H
TD-987  
Hongze Cheng 已提交
744
  tsdbDestroyCommitIters(iters, pMem->maxTables);
H
TD-353  
Hongze Cheng 已提交
745 746 747
  return NULL;
}

H
Hongze Cheng 已提交
748
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
H
TD-353  
Hongze Cheng 已提交
749 750
  if (iters == NULL) return;

H
TD-353  
Hongze Cheng 已提交
751 752 753 754 755
  for (int i = 1; i < maxTables; i++) {
    if (iters[i].pTable != NULL) {
      tsdbUnRefTable(iters[i].pTable);
      tSkipListDestroyIter(iters[i].pIter);
    }
H
TD-353  
Hongze Cheng 已提交
756 757 758
  }

  free(iters);
H
TD-987  
Hongze Cheng 已提交
759 760 761 762 763 764 765 766 767 768
}

static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
  ASSERT(pMemTable->maxTables < maxTables);

  STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *));
  if (pTableData == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
fix bug  
Hongze Cheng 已提交
769
  memcpy((void *)pTableData, (void *)pMemTable->tData, sizeof(STableData *) * pMemTable->maxTables);
H
TD-987  
Hongze Cheng 已提交
770 771 772 773 774 775 776 777

  STableData **tData = pMemTable->tData;

  taosWLockLatch(&(pMemTable->latch));
  pMemTable->maxTables = maxTables;
  pMemTable->tData = pTableData;
  taosWUnLockLatch(&(pMemTable->latch));

S
Shengliang Guan 已提交
778
  taosTFree(tData);
H
TD-987  
Hongze Cheng 已提交
779 780

  return 0;
H
TD-353  
Hongze Cheng 已提交
781
}