tsdbMemTable.c 14.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
static STbData *tsdbNewTbData(tb_uid_t uid);
static void     tsdbFreeTbData(STbData *pTbData);
H
refact  
Hongze Cheng 已提交
20
static char    *tsdbGetTsTupleKey(const void *data);
H
Hongze Cheng 已提交
21
static int      tsdbTbDataComp(const void *arg1, const void *arg2);
H
refact  
Hongze Cheng 已提交
22
static char    *tsdbTbDataGetUid(const void *arg);
23 24
static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
                                    bool merge);
H
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26 27 28 29 30 31 32 33 34
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
  STsdbMemTable *pMemTable;
  SVnode        *pVnode;

  *ppMemTable = NULL;
  pVnode = pTsdb->pVnode;

  // alloc handle
  pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
H
Hongze Cheng 已提交
35
  if (pMemTable == NULL) {
H
Hongze Cheng 已提交
36
    return -1;
H
Hongze Cheng 已提交
37 38
  }

H
Hongze Cheng 已提交
39
  pMemTable->pPool = pTsdb->pVnode->inUse;
H
Hongze Cheng 已提交
40
  T_REF_INIT_VAL(pMemTable, 1);
H
Hongze Cheng 已提交
41
  taosInitRWLatch(&pMemTable->latch);
H
Hongze Cheng 已提交
42
  pMemTable->keyMin = TSKEY_MAX;
H
Hongze Cheng 已提交
43
  pMemTable->keyMax = TSKEY_MIN;
H
Hongze Cheng 已提交
44
  pMemTable->nRow = 0;
H
Hongze Cheng 已提交
45 46
  pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t),
                                      tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
H
Hongze Cheng 已提交
47
  if (pMemTable->pSlIdx == NULL) {
wafwerar's avatar
wafwerar 已提交
48
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
49
    return -1;
H
Hongze Cheng 已提交
50 51 52 53
  }

  pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pMemTable->pHashIdx == NULL) {
H
Hongze Cheng 已提交
54
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
55
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
56
    return -1;
H
Hongze Cheng 已提交
57
  }
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59
  *ppMemTable = pMemTable;
H
Hongze Cheng 已提交
60
  return 0;
H
Hongze Cheng 已提交
61 62
}

H
Hongze Cheng 已提交
63
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
H
Hongze Cheng 已提交
64 65
  if (pMemTable) {
    taosHashCleanup(pMemTable->pHashIdx);
H
Hongze Cheng 已提交
66 67 68 69 70 71 72 73 74 75
    SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx);
    SSkipListNode     *pNode = NULL;
    STbData           *pTbData = NULL;
    for (;;) {
      if (!tSkipListIterNext(pIter)) break;
      pNode = tSkipListIterGet(pIter);
      pTbData = (STbData *)pNode->pData;
      tsdbFreeTbData(pTbData);
    }
    tSkipListDestroyIter(pIter);
H
Hongze Cheng 已提交
76
    tSkipListDestroy(pMemTable->pSlIdx);
wafwerar's avatar
wafwerar 已提交
77
    taosMemoryFree(pMemTable);
H
Hongze Cheng 已提交
78 79
  }
}
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81 82 83 84 85 86 87 88 89 90 91
/**
 * This is an important function to load data or try to load data from memory skiplist iterator.
 *
 * This function load memory data until:
 * 1. iterator ends
 * 2. data key exceeds maxKey
 * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
 * 4. operations in pCols not exceeds its max capacity if pCols is given
 *
 * The function tries to procceed AS MUCH AS POSSIBLE.
 */
92 93
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
                          SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
H
Hongze Cheng 已提交
94 95
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
96 97 98 99
  STSchema *pSchema = NULL;
  TSKEY     rowKey = 0;
  TSKEY     fKey = 0;
  // only fetch lastKey from mem data as file data not used in this function actually
C
Cary Xu 已提交
100
  TSKEY      lastKey = TSKEY_INITIAL_VAL;
H
Hongze Cheng 已提交
101 102
  bool       isRowDel = false;
  int        filterIter = 0;
H
refact  
Hongze Cheng 已提交
103
  STSRow    *row = NULL;
H
Hongze Cheng 已提交
104 105
  SMergeInfo mInfo;

106 107 108
  // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
  // query handle)

H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116
  if (pMergeInfo == NULL) pMergeInfo = &mInfo;

  memset(pMergeInfo, 0, sizeof(*pMergeInfo));
  pMergeInfo->keyFirst = INT64_MAX;
  pMergeInfo->keyLast = INT64_MIN;
  if (pCols) tdResetDataCols(pCols);

  row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
117
  if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
118 119 120
    rowKey = INT64_MAX;
    isRowDel = false;
  } else {
C
Cary Xu 已提交
121 122
    rowKey = TD_ROW_KEY(row);
    isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
123 124 125 126 127 128 129
  }

  if (filterIter >= nFilterKeys) {
    fKey = INT64_MAX;
  } else {
    fKey = tdGetKey(filterKeys[filterIter]);
  }
C
Cary Xu 已提交
130 131
  // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
  // 2. rowKey - would dup since Multi-Version supported
H
Hongze Cheng 已提交
132 133 134 135
  while (true) {
    if (fKey == INT64_MAX && rowKey == INT64_MAX) break;

    if (fKey < rowKey) {
dengyihao's avatar
dengyihao 已提交
136 137
      pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
      pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
138 139 140 141 142 143 144

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
C
Cary Xu 已提交
145
#if 0
H
Hongze Cheng 已提交
146 147 148 149 150 151
    } else if (fKey > rowKey) {
      if (isRowDel) {
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
C
Cary Xu 已提交
152

H
Hongze Cheng 已提交
153 154
        pMergeInfo->rowsInserted++;
        pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
155 156
        pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
        pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
157 158 159 160 161
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
162
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
163 164 165
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
166 167
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180
      }
    } else {
      if (isRowDel) {
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
        tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
          pMergeInfo->rowsUpdated++;
          pMergeInfo->nOperations++;
dengyihao's avatar
dengyihao 已提交
181 182
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
H
Hongze Cheng 已提交
183 184
          tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
        } else {
dengyihao's avatar
dengyihao 已提交
185 186
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
H
Hongze Cheng 已提交
187 188 189 190 191
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
C
Cary Xu 已提交
192
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
Hongze Cheng 已提交
193 194 195
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
C
Cary Xu 已提交
196 197
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
H
Hongze Cheng 已提交
198 199
      }

C
Cary Xu 已提交
200 201 202 203 204 205 206 207
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
#endif
208 209
#if 1
    } else if (fKey > rowKey) {
C
Cary Xu 已提交
210
      if (isRowDel) {
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
        // TODO: support delete function
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;

        if (lastKey != rowKey) {
          pMergeInfo->rowsInserted++;
          pMergeInfo->nOperations++;
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
          if (pCols) {
            if (lastKey != TSKEY_INITIAL_VAL) {
              ++pCols->numOfRows;
            }
226
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
227 228 229 230
          }
          lastKey = rowKey;
        } else {
          if (keepDup) {
231
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
          } else {
            // discard
          }
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }
    } else {           // fkey == rowKey
      if (isRowDel) {  // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
C
Cary Xu 已提交
249 250 251 252
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
253
        tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
C
Cary Xu 已提交
254 255 256
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
257 258 259 260 261
          if (lastKey != rowKey) {
            pMergeInfo->rowsUpdated++;
            pMergeInfo->nOperations++;
            pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
            pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
C
Cary Xu 已提交
262
            if (pCols) {
C
Cary Xu 已提交
263 264 265
              if (lastKey != TSKEY_INITIAL_VAL) {
                ++pCols->numOfRows;
              }
266
              tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
C
Cary Xu 已提交
267
            }
C
Cary Xu 已提交
268
            lastKey = rowKey;
269
          } else {
270
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
271
          }
C
Cary Xu 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
        } else {
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
        }
      }

      tSkipListIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }

H
Hongze Cheng 已提交
288 289 290 291 292 293 294
      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
295 296
#endif
  }
C
Cary Xu 已提交
297
  if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
298
    ++pCols->numOfRows;
H
Hongze Cheng 已提交
299 300 301 302 303
  }

  return 0;
}

H
Hongze Cheng 已提交
304
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
H
Hongze Cheng 已提交
305 306
  SSubmitBlkIter blkIter = {0};
  STsdbMemTable *pMemTable = pTsdb->mem;
H
refact  
Hongze Cheng 已提交
307 308 309
  void          *tptr;
  STbData       *pTbData;
  STSRow        *row;
H
Hongze Cheng 已提交
310 311
  TSKEY          keyMin;
  TSKEY          keyMax;
H
Hongze Cheng 已提交
312
  SSubmitBlk    *pBlkCopy;
H
Hongze Cheng 已提交
313
  int64_t        sverNew;
H
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315 316 317 318 319 320 321 322 323
  // check if table exists
  SMetaReader mr = {0};
  SMetaEntry  me = {0};
  metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
  if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
    metaReaderClear(&mr);
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
    return -1;
  }
C
Cary Xu 已提交
324
  if(pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name);
325

H
Hongze Cheng 已提交
326 327 328 329 330 331
  if (mr.me.type == TSDB_NORMAL_TABLE) {
    sverNew = mr.me.ntbEntry.schema.sver;
  } else {
    metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
    sverNew = mr.me.stbEntry.schema.sver;
  }
H
Hongze Cheng 已提交
332 333
  metaReaderClear(&mr);

H
Hongze Cheng 已提交
334
  // create container is nedd
C
Cary Xu 已提交
335
  tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
H
Hongze Cheng 已提交
336
  if (tptr == NULL) {
C
Cary Xu 已提交
337
    pTbData = tsdbNewTbData(pMsgIter->uid);
H
Hongze Cheng 已提交
338 339 340 341 342
    if (pTbData == NULL) {
      return -1;
    }

    // Put into hash
C
Cary Xu 已提交
343
    taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData));
H
Hongze Cheng 已提交
344 345 346 347 348 349 350

    // Put into skiplist
    tSkipListPut(pMemTable->pSlIdx, pTbData);
  } else {
    pTbData = *(STbData **)tptr;
  }

H
Hongze Cheng 已提交
351
  // copy data to buffer pool
C
Cary Xu 已提交
352 353 354
  int32_t tlen = pMsgIter->dataLen + pMsgIter->schemaLen + sizeof(*pBlock);
  pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, tlen);
  memcpy(pBlkCopy, pBlock, tlen);
H
Hongze Cheng 已提交
355

C
Cary Xu 已提交
356
  tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
H
Hongze Cheng 已提交
357
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
358
  keyMin = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
359

H
Hongze Cheng 已提交
360
  tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
H
Hongze Cheng 已提交
361

C
Cary Xu 已提交
362 363 364 365 366
#ifdef TD_DEBUG_PRINT_ROW
  printf("!!! %s:%d table %" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, pTbData->uid,
         SL_SIZE(pTbData->pData));
#endif

H
Hongze Cheng 已提交
367
  // Set statistics
C
Cary Xu 已提交
368
  keyMax = TD_ROW_KEY(blkIter.row);
H
Hongze Cheng 已提交
369

C
Cary Xu 已提交
370
  pTbData->nrows += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
371 372
  if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
  if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
H
Hongze Cheng 已提交
373

C
Cary Xu 已提交
374
  pMemTable->nRow += pMsgIter->numOfRows;
H
Hongze Cheng 已提交
375 376
  if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
  if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
H
Hongze Cheng 已提交
377

D
dapan 已提交
378 379
  pRsp->numOfRows = pMsgIter->numOfRows;
  pRsp->affectedRows = pMsgIter->numOfRows;
H
Hongze Cheng 已提交
380
  pRsp->sver = sverNew;
C
Cary Xu 已提交
381

H
Hongze Cheng 已提交
382 383 384 385
  return 0;
}

static STbData *tsdbNewTbData(tb_uid_t uid) {
wafwerar's avatar
wafwerar 已提交
386
  STbData *pTbData = (STbData *)taosMemoryCalloc(1, sizeof(*pTbData));
H
Hongze Cheng 已提交
387 388 389 390 391 392 393 394
  if (pTbData == NULL) {
    return NULL;
  }

  pTbData->uid = uid;
  pTbData->keyMin = TSKEY_MAX;
  pTbData->keyMax = TSKEY_MIN;
  pTbData->nrows = 0;
C
Cary Xu 已提交
395
#if 0
H
Hongze Cheng 已提交
396 397
  pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
                                   tsdbGetTsTupleKey);
C
Cary Xu 已提交
398
#endif
399 400
  pTbData->pData =
      tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY, tsdbGetTsTupleKey);
H
Hongze Cheng 已提交
401
  if (pTbData->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
402
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
403 404 405 406 407 408 409 410 411
    return NULL;
  }

  return pTbData;
}

static void tsdbFreeTbData(STbData *pTbData) {
  if (pTbData) {
    tSkipListDestroy(pTbData->pData);
wafwerar's avatar
wafwerar 已提交
412
    taosMemoryFree(pTbData);
H
Hongze Cheng 已提交
413 414 415
  }
}

C
Cary Xu 已提交
416
static char *tsdbGetTsTupleKey(const void *data) { return (char *)TD_ROW_KEY_ADDR((STSRow *)data); }
H
Hongze Cheng 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432

static int tsdbTbDataComp(const void *arg1, const void *arg2) {
  STbData *pTbData1 = (STbData *)arg1;
  STbData *pTbData2 = (STbData *)arg2;

  if (pTbData1->uid > pTbData2->uid) {
    return 1;
  } else if (pTbData1->uid == pTbData2->uid) {
    return 0;
  } else {
    return -1;
  }
}

static char *tsdbTbDataGetUid(const void *arg) {
  STbData *pTbData = (STbData *)arg;
H
Hongze Cheng 已提交
433
  return (char *)(&(pTbData->uid));
H
Hongze Cheng 已提交
434
}
435 436 437

static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
                                    bool merge) {
H
Hongze Cheng 已提交
438
  if (pCols) {
C
Cary Xu 已提交
439
    if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
440
      *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row));
H
Hongze Cheng 已提交
441 442 443 444 445 446
      if (*ppSchema == NULL) {
        ASSERT(false);
        return -1;
      }
    }

447
    tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
H
Hongze Cheng 已提交
448 449 450
  }

  return 0;
H
Hongze Cheng 已提交
451
}