parInsertData.c 29.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
X
Xiaoyu Wang 已提交
15

X
Xiaoyu Wang 已提交
16
#include "parInsertData.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parInt.h"
X
Xiaoyu Wang 已提交
20
#include "parUtil.h"
X
Xiaoyu Wang 已提交
21
#include "querynodes.h"
C
Cary Xu 已提交
22
#include "tRealloc.h"
23 24 25 26

#define IS_RAW_PAYLOAD(t) \
  (((int)(t)) == PAYLOAD_TYPE_RAW)  // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert

27
typedef struct SBlockKeyTuple {
X
Xiaoyu Wang 已提交
28 29
  TSKEY   skey;
  void*   payloadAddr;
wafwerar's avatar
wafwerar 已提交
30
  int16_t index;
31 32 33 34 35 36 37
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

C
Cary Xu 已提交
38 39
typedef struct {
  int32_t   index;
C
Cary Xu 已提交
40
  SArray*   rowArray;  // array of merged rows(mem allocated by tRealloc/free by tFree)
C
Cary Xu 已提交
41
  STSchema* pSchema;
C
Cary Xu 已提交
42
  int64_t   tbUid;  // suid for child table, uid for normal table
C
Cary Xu 已提交
43 44
} SBlockRowMerger;

C
Cary Xu 已提交
45
static FORCE_INLINE void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) {
C
Cary Xu 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59
  if (pMerger) {
    pMerger->index = -1;
  }
}

static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) {
  if (pMerger) {
    int32_t size = taosArrayGetSize(pMerger->rowArray);
    for (int32_t i = 0; i < size; ++i) {
      tFree(*(void**)taosArrayGet(pMerger->rowArray, i));
    }
    taosArrayDestroy(pMerger->rowArray);

    taosMemoryFreeClear(pMerger->pSchema);
C
Cary Xu 已提交
60
    taosMemoryFree(pMerger);
C
Cary Xu 已提交
61 62 63
  }
}

X
Xiaoyu Wang 已提交
64 65 66
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
67 68 69 70 71 72 73
  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

wafwerar's avatar
wafwerar 已提交
74 75 76 77 78 79 80 81 82 83
static int32_t rowDataComparStable(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
  if (left == right) {
    return ((SBlockKeyTuple*)lhs)->index - ((SBlockKeyTuple*)rhs)->index;
  } else {
    return left > right ? 1 : -1;
  }
}

84
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
85 86 87
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
88
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
89
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    pColList->flen += TYPE_BYTES[type];
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
114
    pColList->boundColumns[i] = i;
115 116
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
117
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
118 119 120
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

X
Xiaoyu Wang 已提交
121 122 123
int32_t schemaIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)lhs;
  uint16_t right = *(uint16_t*)rhs;
124 125 126 127 128 129 130 131

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
132 133 134
int32_t boundIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
135 136 137 138 139 140 141 142

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

D
stmt  
dapan1121 已提交
143 144
void destroyBoundColumnInfo(void* pBoundInfo) {
  if (NULL == pBoundInfo) {
D
stmt  
dapan1121 已提交
145 146
    return;
  }
D
stmt  
dapan1121 已提交
147 148

  SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
X
Xiaoyu Wang 已提交
149

150
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
151 152
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
153 154
}

wmmhello's avatar
wmmhello 已提交
155
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
X
Xiaoyu Wang 已提交
156
                               STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
157
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
158 159 160 161 162 163 164 165 166 167 168 169
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
170
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
171
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
172
    taosMemoryFreeClear(dataBuf);
173 174 175 176
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

X
Xiaoyu Wang 已提交
177
  dataBuf->pTableMeta = tableMetaDup(pTableMeta);
178 179

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
X
Xiaoyu Wang 已提交
180
  SSchema*            pSchema = getTableColumnSchema(dataBuf->pTableMeta);
181 182
  setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);

X
Xiaoyu Wang 已提交
183 184 185 186 187
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
  dataBuf->vgId = dataBuf->pTableMeta->vgId;
188 189 190 191 192 193 194

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
195
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
H
Hongze Cheng 已提交
196
  SEncoder coder = {0};
X
Xiaoyu Wang 已提交
197 198
  char*    pBuf;
  int32_t  len;
H
Hongze Cheng 已提交
199

wafwerar's avatar
wafwerar 已提交
200 201
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, ret);
X
Xiaoyu Wang 已提交
202 203 204 205 206 207 208 209 210 211 212
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
H
Hongze Cheng 已提交
213

X
Xiaoyu Wang 已提交
214
  pBuf = pBlocks->pData + pBlocks->size;
H
Hongze Cheng 已提交
215

H
Hongze Cheng 已提交
216
  tEncoderInit(&coder, pBuf, len);
X
Xiaoyu Wang 已提交
217
  int32_t code = tEncodeSVCreateTbReq(&coder, pCreateTbReq);
H
Hongze Cheng 已提交
218
  tEncoderClear(&coder);
X
Xiaoyu Wang 已提交
219 220
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
X
Xiaoyu Wang 已提交
221 222

  return code;
X
Xiaoyu Wang 已提交
223 224
}

X
Xiaoyu Wang 已提交
225 226
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset,
                             int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
X
Xiaoyu Wang 已提交
227
                             SVCreateTbReq* pCreateTbReq) {
228
  *dataBlocks = NULL;
D
dapan 已提交
229
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
230 231 232 233 234
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
235
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
236 237 238 239
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
Hongze Cheng 已提交
240
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
X
Xiaoyu Wang 已提交
241 242 243 244 245 246
      ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    }

X
Xiaoyu Wang 已提交
247
    taosHashPut(pHashList, id, idLen, dataBlocks, POINTER_BYTES);
248 249 250 251 252 253 254 255 256
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t getRowExpandSize(STableMeta* pTableMeta) {
C
Cary Xu 已提交
257
  int32_t  result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
258 259
  int32_t  columns = getNumOfColumns(pTableMeta);
  SSchema* pSchema = getTableColumnSchema(pTableMeta);
C
Cary Xu 已提交
260
  for (int32_t i = 0; i < columns; ++i) {
261 262 263 264
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
C
Cary Xu 已提交
265
  result += (int32_t)TD_BITMAP_BYTES(columns - 1);
266 267 268
  return result;
}

269
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
270 271 272 273
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
274
  taosMemoryFreeClear(pDataBlock->pData);
X
Xiaoyu Wang 已提交
275 276 277
  //  if (!pDataBlock->cloned) {
  // free the refcount for metermeta
  taosMemoryFreeClear(pDataBlock->pTableMeta);
278

X
Xiaoyu Wang 已提交
279 280
  destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  //  }
wafwerar's avatar
wafwerar 已提交
281
  taosMemoryFreeClear(pDataBlock);
282 283
}

284
void destroyBlockArrayList(SArray* pDataBlockList) {
285
  if (pDataBlockList == NULL) {
286
    return;
287 288 289 290
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
291 292
    void* p = taosArrayGetP(pDataBlockList, i);
    destroyDataBlock(p);
293 294 295 296 297
  }

  taosArrayDestroy(pDataBlockList);
}

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
    destroyDataBlock(pBlocks);

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

314
// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
315 316
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
317 318 319 320 321

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);

  if (!dataBuf->ordered) {
X
Xiaoyu Wang 已提交
322
    char* pBlockData = pBlocks->data;
323 324

    // todo. qsort is unstable, if timestamp is same, should get the last one
wafwerar's avatar
wafwerar 已提交
325
    taosSort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
326 327 328 329

    int32_t i = 0;
    int32_t j = 1;

330
    // delete rows with timestamp conflicts
331
    while (j < pBlocks->numOfRows) {
X
Xiaoyu Wang 已提交
332 333
      TSKEY ti = *(TSKEY*)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY*)(pBlockData + dataBuf->rowSize * j);
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;

    pBlocks->numOfRows = i + 1;
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
  }

  dataBuf->prevTS = INT64_MIN;
}

// data block is disordered, sort it in ascending order
C
Cary Xu 已提交
358
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
X
Xiaoyu Wang 已提交
359
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
360 361 362 363 364 365 366
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
X
Xiaoyu Wang 已提交
367
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
368 369 370
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
371
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
372 373 374 375 376
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
X
Xiaoyu Wang 已提交
377 378
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
379 380
  int             n = 0;
  while (n < nRows) {
X
Xiaoyu Wang 已提交
381
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
382
    pBlkKeyTuple->payloadAddr = pBlockData;
wafwerar's avatar
wafwerar 已提交
383
    pBlkKeyTuple->index = n;
384 385 386 387 388 389 390 391 392

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
393 394

    // todo. qsort is unstable, if timestamp is same, should get the last one
wafwerar's avatar
wafwerar 已提交
395
    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

C
Cary Xu 已提交
426 427 428 429 430 431 432 433
static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) {
  if (pBlkRowMerger && (pBlkRowMerger->index >= 0)) {
    ASSERT(pBlkRowMerger->index < taosArrayGetSize(pBlkRowMerger->rowArray));
    return *(void**)taosArrayGet(pBlkRowMerger->rowArray, pBlkRowMerger->index);
  }
  return NULL;
}

C
Cary Xu 已提交
434
static int32_t tdBlockRowMerge(STableMeta* pTableMeta, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows,
C
Cary Xu 已提交
435 436 437 438 439 440 441
                               SBlockRowMerger** pBlkRowMerger, int32_t rowSize) {
  ASSERT(nDupRows > 1);
  SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1);
  ASSERT(pStartKeyTp->skey == pEndKeyTp->skey);

  // TODO: optimization if end row is all normal
#if 0
C
Cary Xu 已提交
442
  STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr;
C
Cary Xu 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
  if(isNormal(pEndRow)) { // set the end row if it is normal and return directly
    pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr;
    return TSDB_CODE_SUCCESS;
  }
#endif

  if (!(*pBlkRowMerger)) {
    (*pBlkRowMerger) = taosMemoryCalloc(1, sizeof(**pBlkRowMerger));
    if (!(*pBlkRowMerger)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
    (*pBlkRowMerger)->index = -1;
    if (!(*pBlkRowMerger)->rowArray) {
      (*pBlkRowMerger)->rowArray = taosArrayInit(1, sizeof(void*));
      if (!(*pBlkRowMerger)->rowArray) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
    }
  }

C
Cary Xu 已提交
465 466 467 468 469 470 471 472 473 474
  if ((*pBlkRowMerger)->pSchema) {
    if ((*pBlkRowMerger)->pSchema->version != pTableMeta->sversion) {
      taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
    } else {
      if ((*pBlkRowMerger)->tbUid != (pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid)) {
        taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
      }
    }
  }

C
Cary Xu 已提交
475
  if (!(*pBlkRowMerger)->pSchema) {
C
Cary Xu 已提交
476 477
    (*pBlkRowMerger)->pSchema =
        tdGetSTSChemaFromSSChema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
C
Cary Xu 已提交
478 479 480 481 482

    if (!(*pBlkRowMerger)->pSchema) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
483
    (*pBlkRowMerger)->tbUid = pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid;
C
Cary Xu 已提交
484 485 486 487 488
  }

  void* pDestRow = NULL;
  ++((*pBlkRowMerger)->index);
  if ((*pBlkRowMerger)->index < taosArrayGetSize((*pBlkRowMerger)->rowArray)) {
489 490
    void** pAlloc = (void**)taosArrayGet((*pBlkRowMerger)->rowArray, (*pBlkRowMerger)->index);
    if (tRealloc((uint8_t**)pAlloc, rowSize) != 0) {
C
Cary Xu 已提交
491 492
      return TSDB_CODE_FAILED;
    }
493
    pDestRow = *pAlloc;
C
Cary Xu 已提交
494 495 496 497 498 499 500 501 502 503 504 505
  } else {
    if (tRealloc((uint8_t**)&pDestRow, rowSize) != 0) {
      return TSDB_CODE_FAILED;
    }
    taosArrayPush((*pBlkRowMerger)->rowArray, &pDestRow);
  }

  // merge rows to pDestRow
  STSchema* pSchema = (*pBlkRowMerger)->pSchema;
  SArray*   pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
  for (int32_t i = 0; i < pSchema->numOfCols; ++i) {
    SColVal colVal = {0};
C
Cary Xu 已提交
506
    for (int32_t j = 0; j < nDupRows; ++j) {
C
Cary Xu 已提交
507
      tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal);
H
Hongze Cheng 已提交
508
      if (!COL_VAL_IS_NONE(&colVal)) {
C
Cary Xu 已提交
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
        break;
      }
    }
    taosArrayPush(pArray, &colVal);
  }
  if (tdSTSRowNew(pArray, pSchema, (STSRow**)&pDestRow) < 0) {
    taosArrayDestroy(pArray);
    return TSDB_CODE_FAILED;
  }

  taosArrayDestroy(pArray);
  return TSDB_CODE_SUCCESS;
}

// data block is disordered, sort it in ascending order, and merge dup rows if exists
static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo,
                                     SBlockRowMerger** ppBlkRowMerger) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
C
Cary Xu 已提交
527
  STableMeta* pTableMeta = dataBuf->pTableMeta;
528
  int32_t     nRows = pBlocks->numOfRows;
C
Cary Xu 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548

  // size is less than the total size, since duplicated rows may be removed.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  tdResetSBlockRowMerger(*ppBlkRowMerger);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
549
  int32_t         n = 0;
C
Cary Xu 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
  while (n < nRows) {
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
    pBlkKeyTuple->payloadAddr = pBlockData;
    pBlkKeyTuple->index = n;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;

    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    bool    hasDup = false;
    int32_t nextPos = 0;
    int32_t i = 0;
    int32_t j = 1;

    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      if ((j - i) > 1) {
C
Cary Xu 已提交
582
        if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
583 584 585
          return TSDB_CODE_FAILED;
        }
        (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
C
Cary Xu 已提交
586 587 588
        if (!hasDup) {
          hasDup = true;
        }
C
Cary Xu 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602
        i = j;
      } else {
        if (hasDup) {
          memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
        }
        ++i;
      }

      ++nextPos;
      ++j;
    }

    if ((j - i) > 1) {
      ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey);
C
Cary Xu 已提交
603
      if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
604 605 606 607 608 609 610 611
        return TSDB_CODE_FAILED;
      }
      (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
    } else if (hasDup) {
      memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
    }

    dataBuf->ordered = true;
C
Cary Xu 已提交
612
    pBlocks->numOfRows = nextPos + 1;
C
Cary Xu 已提交
613 614 615 616 617 618 619 620
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return TSDB_CODE_SUCCESS;
}

621
// Erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
622 623
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
                         bool isRawPayload) {
624
  // TODO: optimize this function, handle the case while binary is not presented
X
Xiaoyu Wang 已提交
625 626 627
  STableMeta*   pTableMeta = pTableDataBlock->pTableMeta;
  STableComInfo tinfo = getTableInfo(pTableMeta);
  SSchema*      pSchema = getTableColumnSchema(pTableMeta);
628

X
Xiaoyu Wang 已提交
629
  int32_t     nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
630
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
631 632
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
633 634

  int32_t flen = 0;  // original total length of row
X
Xiaoyu Wang 已提交
635 636
  if (isRawPayload) {
    for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
637 638 639
      flen += TYPE_BYTES[pSchema[j].type];
    }
  }
X
Xiaoyu Wang 已提交
640
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
641

X
Xiaoyu Wang 已提交
642
  char* p = pTableDataBlock->pData + nonDataLen;
643
  pBlock->dataLen = 0;
644
  int32_t numOfRows = pBlock->numOfRows;
645 646

  if (isRawPayload) {
C
Cary Xu 已提交
647
    SRowBuilder builder = {0};
X
Xiaoyu Wang 已提交
648

C
Cary Xu 已提交
649 650
    tdSRowInit(&builder, pTableMeta->sversion);
    tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
651

C
Cary Xu 已提交
652 653
    for (int32_t i = 0; i < numOfRows; ++i) {
      tdSRowResetBuf(&builder, pDataBlock);
654
      int toffset = 0;
C
Cary Xu 已提交
655 656 657 658 659
      for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
        int8_t  colType = pSchema[j].type;
        uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
        tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
        toffset += TYPE_BYTES[colType];
660 661
        p += pSchema[j].bytes;
      }
662
      tdSRowEnd(&builder);
C
Cary Xu 已提交
663 664 665
      int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
      pDataBlock = (char*)pDataBlock + rowLen;
      pBlock->dataLen += rowLen;
666 667 668
    }
  } else {
    for (int32_t i = 0; i < numOfRows; ++i) {
X
Xiaoyu Wang 已提交
669
      void*     payload = (blkKeyTuple + i)->payloadAddr;
X
Xiaoyu Wang 已提交
670
      TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
C
Cary Xu 已提交
671 672 673
      memcpy(pDataBlock, payload, rowTLen);
      pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
      pBlock->dataLen += rowTLen;
674 675 676
    }
  }

677
  return pBlock->dataLen + pBlock->schemaLen;
678 679
}

X
Xiaoyu Wang 已提交
680
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
681
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
682 683
  int       code = 0;
  bool      isRawPayload = IS_RAW_PAYLOAD(payloadType);
D
dapan 已提交
684
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
685 686 687
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
X
Xiaoyu Wang 已提交
688 689
  STableDataBlocks*  pOneTableBlock = *p;
  SBlockKeyInfo      blkKeyInfo = {0};  // share by pOneTableBlock
X
Xiaoyu Wang 已提交
690
  SBlockRowMerger*   pBlkRowMerger = NULL;
C
Cary Xu 已提交
691

692
  while (pOneTableBlock) {
X
Xiaoyu Wang 已提交
693
    SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
694 695
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
X
Xiaoyu Wang 已提交
696 697 698 699
      pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId;  // for schemaless, restore origin vgId
      int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId),
                                         TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf,
                                         pVnodeDataBlockList, NULL);
700
      if (ret != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
701
        tdFreeSBlockRowMerger(pBlkRowMerger);
702 703
        taosHashCleanup(pVnodeDataBlockHashList);
        destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
704
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
705 706
        return ret;
      }
X
Xiaoyu Wang 已提交
707
      ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
708
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
709
      int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
710
      int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
X
Xiaoyu Wang 已提交
711 712
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
                         pOneTableBlock->createTbReqLen;
713 714 715

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
716
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
717 718 719
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
C
Cary Xu 已提交
720
          tdFreeSBlockRowMerger(pBlkRowMerger);
721 722
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
723 724
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
725 726 727 728 729 730 731
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

      if (isRawPayload) {
        sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
      } else {
C
Cary Xu 已提交
732 733
        if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
          tdFreeSBlockRowMerger(pBlkRowMerger);
734 735
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
736 737
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
738 739 740 741 742 743
          return code;
        }
        ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
      }

      // erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
744 745
      int32_t finalLen =
          trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
C
Cary Xu 已提交
761
  tdFreeSBlockRowMerger(pBlkRowMerger);
762
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
763
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
764
  *pVgDataBlocks = pVnodeDataBlockList;
765 766 767
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
768 769
int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
  size_t   remain = pDataBlock->nAllocSize - pDataBlock->size;
D
stmt  
dapan1121 已提交
770
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
X
Xiaoyu Wang 已提交
771

D
stmt  
dapan1121 已提交
772 773 774 775
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

X
Xiaoyu Wang 已提交
776
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
D
stmt  
dapan1121 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
790
int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) {
791 792
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
  const int factor = 5;
X
Xiaoyu Wang 已提交
793 794
  uint32_t  nAllocSizeOld = pDataBlock->nAllocSize;

795 796 797 798 799 800 801
  // expand the allocated size
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }

X
Xiaoyu Wang 已提交
802
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
818
int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
C
Cary Xu 已提交
819 820 821 822
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
823 824
  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
825

D
stmt  
dapan1121 已提交
826
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
D
stmt  
dapan1121 已提交
827
  STableDataBlocks* pBlock = (STableDataBlocks*)block;
D
stmt  
dapan1121 已提交
828

D
stmt  
dapan1121 已提交
829 830 831 832 833 834 835 836
  if (keepBuf) {
    taosMemoryFreeClear(pBlock->pData);
    pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
    if (NULL == pBlock->pData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memset(pBlock->pData, 0, sizeof(SSubmitBlk));
  } else {
X
Xiaoyu Wang 已提交
837
    pBlock->pData = NULL;
D
stmt  
dapan1121 已提交
838
  }
X
Xiaoyu Wang 已提交
839 840 841 842

  pBlock->ordered = true;
  pBlock->prevTS = INT64_MIN;
  pBlock->size = sizeof(SSubmitBlk);
D
stmt  
dapan1121 已提交
843 844 845 846
  pBlock->tsSource = -1;
  pBlock->numOfTables = 1;
  pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
  pBlock->headerSize = pBlock->size;
D
dapan 已提交
847
  pBlock->createTbReqLen = 0;
X
Xiaoyu Wang 已提交
848

D
stmt  
dapan1121 已提交
849
  memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
D
stmt  
dapan1121 已提交
850 851

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
852 853
}

D
stmt  
dapan1121 已提交
854 855 856 857
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
  *pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
858
  }
X
Xiaoyu Wang 已提交
859

D
stmt  
dapan1121 已提交
860 861
  memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
  ((STableDataBlocks*)(*pDst))->cloned = true;
X
Xiaoyu Wang 已提交
862

D
dapan1121 已提交
863 864
  STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst);
  if (pBlock->pTableMeta) {
X
Xiaoyu Wang 已提交
865
    void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta));
D
dapan1121 已提交
866 867 868 869 870 871 872 873
    if (NULL == pNewMeta) {
      taosMemoryFreeClear(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta));
    pBlock->pTableMeta = pNewMeta;
  }

D
stmt  
dapan1121 已提交
874
  return qResetStmtDataBlock(*pDst, false);
D
stmt  
dapan1121 已提交
875 876
}

D
dapan 已提交
877
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
D
stmt  
dapan1121 已提交
878 879 880
  int32_t code = qCloneStmtDataBlock(pDst, pSrc);
  if (code) {
    return code;
D
stmt  
dapan1121 已提交
881 882
  }

X
Xiaoyu Wang 已提交
883
  STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
D
stmt  
dapan1121 已提交
884 885 886 887
  pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
  if (NULL == pBlock->pData) {
    qFreeStmtDataBlock(pBlock);
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
888 889
  }

D
dapan 已提交
890
  pBlock->vgId = vgId;
X
Xiaoyu Wang 已提交
891

D
dapan 已提交
892 893
  if (pBlock->pTableMeta) {
    pBlock->pTableMeta->uid = uid;
D
dapan 已提交
894
    pBlock->pTableMeta->vgId = vgId;
D
dapan 已提交
895
  }
X
Xiaoyu Wang 已提交
896

D
stmt  
dapan1121 已提交
897 898
  memset(pBlock->pData, 0, sizeof(SSubmitBlk));

D
stmt  
dapan1121 已提交
899 900 901
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
902
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; }
D
dapan 已提交
903

D
stmt  
dapan1121 已提交
904 905 906
void qFreeStmtDataBlock(void* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
907 908
  }

D
dapan1121 已提交
909
  taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta);
D
stmt  
dapan1121 已提交
910 911
  taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
  taosMemoryFreeClear(pDataBlock);
D
stmt  
dapan1121 已提交
912 913
}

D
stmt  
dapan1121 已提交
914 915 916
void qDestroyStmtDataBlock(void* pBlock) {
  if (pBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
917 918
  }

D
stmt  
dapan1121 已提交
919
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
920

D
stmt  
dapan1121 已提交
921 922 923
  pDataBlock->cloned = false;
  destroyDataBlock(pDataBlock);
}