tsdbMemTable.c 14.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
static STbData *tsdbNewTbData(tb_uid_t uid);
static void     tsdbFreeTbData(STbData *pTbData);
H
refact  
Hongze Cheng 已提交
20
static char    *tsdbGetTsTupleKey(const void *data);
H
Hongze Cheng 已提交
21
static int      tsdbTbDataComp(const void *arg1, const void *arg2);
H
refact  
Hongze Cheng 已提交
22
static char    *tsdbTbDataGetUid(const void *arg);
23
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge);
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25 26 27 28 29 30 31 32 33
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
  STsdbMemTable *pMemTable;
  SVnode        *pVnode;

  *ppMemTable = NULL;
  pVnode = pTsdb->pVnode;

  // alloc handle
  pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
34
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
35
    return -1;
H
Hongze Cheng 已提交
36 37
  }

H
Hongze Cheng 已提交
38
  pMemTable->pPool = pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
39
  T_REF_INIT_VAL(pMemTable, 1);
H
Hongze Cheng 已提交
40
  taosInitRWLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
41
  pMemTable->keyMin = TSKEY_MAX;
H
Hongze Cheng 已提交
42
  pMemTable->keyMax = TSKEY_MIN;
H
Hongze Cheng 已提交
43
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
44 45
  pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t),
                                      tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
H
Hongze Cheng 已提交
46
  if (pMemTable->pSlIdx == NULL) {
wafwerar's avatar
wafwerar 已提交
47
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
48
    return -1;
H
Hongze Cheng 已提交
49 50 51 52
  }

  pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pMemTable->pHashIdx == NULL) {
H
Hongze Cheng 已提交
53
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
54
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
55
    return -1;
H
Hongze Cheng 已提交
56
  }
H
Hongze Cheng 已提交
57

H
Hongze Cheng 已提交
58
  *ppMemTable = pMemTable;
H
Hongze Cheng 已提交
59
  return 0;
H
Hongze Cheng 已提交
60 61
}

H
Hongze Cheng 已提交
62
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
H
Hongze Cheng 已提交
63 64
  if (pMemTable) {
    taosHashCleanup(pMemTable->pHashIdx);
H
Hongze Cheng 已提交
65 66 67 68 69 70 71 72 73 74
    SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx);
    SSkipListNode     *pNode = NULL;
    STbData           *pTbData = NULL;
    for (;;) {
      if (!tSkipListIterNext(pIter)) break;
      pNode = tSkipListIterGet(pIter);
      pTbData = (STbData *)pNode->pData;
      tsdbFreeTbData(pTbData);
    }
    tSkipListDestroyIter(pIter);
H
Hongze Cheng 已提交
75
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
76
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
77 78
  }
}
H
Hongze Cheng 已提交
79

H
Hongze Cheng 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
/**
 * This is an important function to load data or try to load data from memory skiplist iterator.
 *
 * This function load memory data until:
 * 1. iterator ends
 * 2. data key exceeds maxKey
 * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
 * 4. operations in pCols not exceeds its max capacity if pCols is given
 *
 * The function tries to procceed AS MUCH AS POSSIBLE.
 */
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
                          TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
95 96 97 98
  STSchema *pSchema = NULL;
  TSKEY     rowKey = 0;
  TSKEY     fKey = 0;
  // only fetch lastKey from mem data as file data not used in this function actually
C
Cary Xu 已提交
99
  TSKEY      lastKey = TSKEY_INITIAL_VAL;
H
Hongze Cheng 已提交
100 101
  bool       isRowDel = false;
  int        filterIter = 0;
H
refact  
Hongze Cheng 已提交
102
  STSRow    *row = NULL;
H
Hongze Cheng 已提交
103 104
  SMergeInfo mInfo;

105 106 107
  // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
  // query handle)

H
Hongze Cheng 已提交
108 109 110 111 112 113 114 115
  if (pMergeInfo == NULL) pMergeInfo = &mInfo;

  memset(pMergeInfo, 0, sizeof(*pMergeInfo));
  pMergeInfo->keyFirst = INT64_MAX;
  pMergeInfo->keyLast = INT64_MIN;
  if (pCols) tdResetDataCols(pCols);

  row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
116
  if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
117 118 119
    rowKey = INT64_MAX;
    isRowDel = false;
  } else {
C
Cary Xu 已提交
120 121
    rowKey = TD_ROW_KEY(row);
    isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
122 123 124 125 126 127 128
  }

  if (filterIter >= nFilterKeys) {
    fKey = INT64_MAX;
  } else {
    fKey = tdGetKey(filterKeys[filterIter]);
  }
C
Cary Xu 已提交
129 130
  // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
  // 2. rowKey - would dup since Multi-Version supported
H
Hongze Cheng 已提交
131 132 133 134
  while (true) {
    if (fKey == INT64_MAX && rowKey == INT64_MAX) break;

    if (fKey < rowKey) {
dengyihao's avatar
dengyihao 已提交
135 136
      pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
      pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
137 138 139 140 141 142 143

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
C
Cary Xu 已提交
144
#if 0
H
Hongze Cheng 已提交
145 146 147 148 149 150
    } else if (fKey > rowKey) {
      if (isRowDel) {
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
C
Cary Xu 已提交
151

H
Hongze Cheng 已提交
152 153
        pMergeInfo->rowsInserted++;
        pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
154 155
        pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
        pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
156 157 158 159 160
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
161
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
162 163 164
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
165 166
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179
      }
    } else {
      if (isRowDel) {
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
          pMergeInfo->rowsUpdated++;
          pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
180 181
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
182 183
          tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
        } else {
dengyihao's avatar
dengyihao 已提交
184 185
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
186 187 188 189 190
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
191
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
192 193 194
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
195 196
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
197 198
      }

C
Cary Xu 已提交
199 200 201 202 203 204 205 206
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
#endif
207 208
#if 1
    } else if (fKey > rowKey) {
C
Cary Xu 已提交
209
      if (isRowDel) {
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
        // TODO: support delete function
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;

        if (lastKey != rowKey) {
          pMergeInfo->rowsInserted++;
          pMergeInfo->nOperations++;
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
          if (pCols) {
            if (lastKey != TSKEY_INITIAL_VAL) {
              ++pCols->numOfRows;
            }
            tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
          }
          lastKey = rowKey;
        } else {
          if (keepDup) {
            tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
          } else {
            // discard
          }
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }
    } else {           // fkey == rowKey
      if (isRowDel) {  // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
C
Cary Xu 已提交
248 249 250 251
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
252
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
C
Cary Xu 已提交
253 254 255
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
256 257 258 259 260
          if (lastKey != rowKey) {
            pMergeInfo->rowsUpdated++;
            pMergeInfo->nOperations++;
            pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
            pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
C
Cary Xu 已提交
261
            if (pCols) {
C
Cary Xu 已提交
262 263 264
              if (lastKey != TSKEY_INITIAL_VAL) {
                ++pCols->numOfRows;
              }
C
Cary Xu 已提交
265 266
              tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
            }
C
Cary Xu 已提交
267
            lastKey = rowKey;
268 269 270
          } else {
            tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
          }
C
Cary Xu 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
        } else {
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }

H
Hongze Cheng 已提交
287 288 289 290 291 292 293
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
294 295
#endif
  }
C
Cary Xu 已提交
296
  if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
297
    ++pCols->numOfRows;
H
Hongze Cheng 已提交
298 299 300 301 302
  }

  return 0;
}

H
Hongze Cheng 已提交
303
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
H
Hongze Cheng 已提交
304 305
  SSubmitBlkIter blkIter = {0};
  STsdbMemTable *pMemTable = pTsdb->mem;
H
refact  
Hongze Cheng 已提交
306 307 308
  void          *tptr;
  STbData       *pTbData;
  STSRow        *row;
H
Hongze Cheng 已提交
309 310
  TSKEY          keyMin;
  TSKEY          keyMax;
H
Hongze Cheng 已提交
311
  SSubmitBlk    *pBlkCopy;
H
Hongze Cheng 已提交
312
  int64_t        sverNew;
H
Hongze Cheng 已提交
313

H
Hongze Cheng 已提交
314 315 316 317 318 319 320 321 322
  // check if table exists
  SMetaReader mr = {0};
  SMetaEntry  me = {0};
  metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
  if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
    metaReaderClear(&mr);
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
    return -1;
  }
H
Hongze Cheng 已提交
323 324 325 326 327 328
  if (mr.me.type == TSDB_NORMAL_TABLE) {
    sverNew = mr.me.ntbEntry.schema.sver;
  } else {
    metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
    sverNew = mr.me.stbEntry.schema.sver;
  }
H
Hongze Cheng 已提交
329 330
  metaReaderClear(&mr);

H
Hongze Cheng 已提交
331
  // create container is nedd
C
Cary Xu 已提交
332
  tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
H
Hongze Cheng 已提交
333
  if (tptr == NULL) {
C
Cary Xu 已提交
334
    pTbData = tsdbNewTbData(pMsgIter->uid);
H
Hongze Cheng 已提交
335 336 337 338 339
    if (pTbData == NULL) {
      return -1;
    }

    // Put into hash
C
Cary Xu 已提交
340
    taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData));
H
Hongze Cheng 已提交
341 342 343 344 345 346 347

    // Put into skiplist
    tSkipListPut(pMemTable->pSlIdx, pTbData);
  } else {
    pTbData = *(STbData **)tptr;
  }

H
Hongze Cheng 已提交
348
  // copy data to buffer pool
C
Cary Xu 已提交
349 350 351
  int32_t tlen = pMsgIter->dataLen + pMsgIter->schemaLen + sizeof(*pBlock);
  pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, tlen);
  memcpy(pBlkCopy, pBlock, tlen);
H
Hongze Cheng 已提交
352

C
Cary Xu 已提交
353
  tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
H
Hongze Cheng 已提交
354
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
355
  keyMin = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
356

H
Hongze Cheng 已提交
357
  tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
H
Hongze Cheng 已提交
358

C
Cary Xu 已提交
359 360 361 362 363
#ifdef TD_DEBUG_PRINT_ROW
  printf("!!! %s:%d table %" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, pTbData->uid,
         SL_SIZE(pTbData->pData));
#endif

H
Hongze Cheng 已提交
364
  // Set statistics
C
Cary Xu 已提交
365
  keyMax = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
366

C
Cary Xu 已提交
367
  pTbData->nrows += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
368 369
  if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
  if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
H
Hongze Cheng 已提交
370

C
Cary Xu 已提交
371
  pMemTable->nRow += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
372 373
  if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
  if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
H
Hongze Cheng 已提交
374

D
dapan 已提交
375 376
  pRsp->numOfRows = pMsgIter->numOfRows;
  pRsp->affectedRows = pMsgIter->numOfRows;
H
Hongze Cheng 已提交
377
  pRsp->sver = sverNew;
C
Cary Xu 已提交
378

H
Hongze Cheng 已提交
379 380 381 382
  return 0;
}

static STbData *tsdbNewTbData(tb_uid_t uid) {
wafwerar's avatar
wafwerar 已提交
383
  STbData *pTbData = (STbData *)taosMemoryCalloc(1, sizeof(*pTbData));
H
Hongze Cheng 已提交
384 385 386 387 388 389 390 391
  if (pTbData == NULL) {
    return NULL;
  }

  pTbData->uid = uid;
  pTbData->keyMin = TSKEY_MAX;
  pTbData->keyMax = TSKEY_MIN;
  pTbData->nrows = 0;
C
Cary Xu 已提交
392
#if 0
H
Hongze Cheng 已提交
393 394
  pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
                                   tsdbGetTsTupleKey);
C
Cary Xu 已提交
395
#endif
396 397
  pTbData->pData =
      tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY, tsdbGetTsTupleKey);
H
Hongze Cheng 已提交
398
  if (pTbData->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
399
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
400 401 402 403 404 405 406 407 408
    return NULL;
  }

  return pTbData;
}

static void tsdbFreeTbData(STbData *pTbData) {
  if (pTbData) {
    tSkipListDestroy(pTbData->pData);
wafwerar's avatar
wafwerar 已提交
409
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
410 411 412
  }
}

C
Cary Xu 已提交
413
static char *tsdbGetTsTupleKey(const void *data) { return (char *)TD_ROW_KEY_ADDR((STSRow *)data); }
H
Hongze Cheng 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429

static int tsdbTbDataComp(const void *arg1, const void *arg2) {
  STbData *pTbData1 = (STbData *)arg1;
  STbData *pTbData2 = (STbData *)arg2;

  if (pTbData1->uid > pTbData2->uid) {
    return 1;
  } else if (pTbData1->uid == pTbData2->uid) {
    return 0;
  } else {
    return -1;
  }
}

static char *tsdbTbDataGetUid(const void *arg) {
  STbData *pTbData = (STbData *)arg;
H
Hongze Cheng 已提交
430
  return (char *)(&(pTbData->uid));
H
Hongze Cheng 已提交
431
}
432
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge) {
H
Hongze Cheng 已提交
433
  if (pCols) {
C
Cary Xu 已提交
434 435
    if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
      *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row));
H
Hongze Cheng 已提交
436 437 438 439 440 441
      if (*ppSchema == NULL) {
        ASSERT(false);
        return -1;
      }
    }

442
    tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
H
Hongze Cheng 已提交
443 444 445
  }

  return 0;
H
Hongze Cheng 已提交
446
}