parInsertData.c 29.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15
// clang-format off
X
Xiaoyu Wang 已提交
16
#include "parInsertData.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parInt.h"
X
Xiaoyu Wang 已提交
20
#include "parUtil.h"
X
Xiaoyu Wang 已提交
21
#include "querynodes.h"
C
Cary Xu 已提交
22
#include "tRealloc.h"
23 24 25 26

#define IS_RAW_PAYLOAD(t) \
  (((int)(t)) == PAYLOAD_TYPE_RAW)  // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert

27 28 29
typedef struct SBlockKeyTuple {
  TSKEY skey;
  void* payloadAddr;
wafwerar's avatar
wafwerar 已提交
30
  int16_t index;
31 32 33 34 35 36 37
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

C
Cary Xu 已提交
38 39
typedef struct {
  int32_t   index;
C
Cary Xu 已提交
40
  SArray*   rowArray;  // array of merged rows(mem allocated by tRealloc/free by tFree)
C
Cary Xu 已提交
41
  STSchema* pSchema;
C
Cary Xu 已提交
42
  int64_t   tbUid;  // suid for child table, uid for normal table
C
Cary Xu 已提交
43 44
} SBlockRowMerger;

C
Cary Xu 已提交
45
static FORCE_INLINE void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) {
C
Cary Xu 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59
  if (pMerger) {
    pMerger->index = -1;
  }
}

static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) {
  if (pMerger) {
    int32_t size = taosArrayGetSize(pMerger->rowArray);
    for (int32_t i = 0; i < size; ++i) {
      tFree(*(void**)taosArrayGet(pMerger->rowArray, i));
    }
    taosArrayDestroy(pMerger->rowArray);

    taosMemoryFreeClear(pMerger->pSchema);
C
Cary Xu 已提交
60
    taosMemoryFree(pMerger);
C
Cary Xu 已提交
61 62 63
  }
}

X
Xiaoyu Wang 已提交
64 65 66
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
67 68 69 70 71 72 73
  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

wafwerar's avatar
wafwerar 已提交
74 75 76 77 78 79 80 81 82 83
static int32_t rowDataComparStable(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
  if (left == right) {
    return ((SBlockKeyTuple*)lhs)->index - ((SBlockKeyTuple*)rhs)->index;
  } else {
    return left > right ? 1 : -1;
  }
}

84
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
85 86 87
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
88
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
89
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    pColList->flen += TYPE_BYTES[type];
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
114
    pColList->boundColumns[i] = i;
115 116
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
117
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
118 119 120
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

X
Xiaoyu Wang 已提交
121 122 123
int32_t schemaIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)lhs;
  uint16_t right = *(uint16_t*)rhs;
124 125 126 127 128 129 130 131

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
132 133 134
int32_t boundIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
135 136 137 138 139 140 141 142

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

D
stmt  
dapan1121 已提交
143 144
void destroyBoundColumnInfo(void* pBoundInfo) {
  if (NULL == pBoundInfo) {
D
stmt  
dapan1121 已提交
145 146
    return;
  }
D
stmt  
dapan1121 已提交
147 148

  SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
X
Xiaoyu Wang 已提交
149

150
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
151 152
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
153 154
}

wmmhello's avatar
wmmhello 已提交
155
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
X
Xiaoyu Wang 已提交
156
                               STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
157
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
158 159 160 161 162 163 164 165 166 167 168 169
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
170
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
171
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
172
    taosMemoryFreeClear(dataBuf);
173 174 175 176
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

X
Xiaoyu Wang 已提交
177
  dataBuf->pTableMeta = tableMetaDup(pTableMeta);
178 179

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
X
Xiaoyu Wang 已提交
180
  SSchema*            pSchema = getTableColumnSchema(dataBuf->pTableMeta);
181 182
  setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);

X
Xiaoyu Wang 已提交
183 184 185 186 187
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
  dataBuf->vgId = dataBuf->pTableMeta->vgId;
188 189 190 191 192 193 194

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
195
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
H
Hongze Cheng 已提交
196
  SEncoder coder = {0};
H
Hongze Cheng 已提交
197
  char* pBuf;
wafwerar's avatar
wafwerar 已提交
198
  int32_t len;
H
Hongze Cheng 已提交
199

wafwerar's avatar
wafwerar 已提交
200 201
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, ret);
X
Xiaoyu Wang 已提交
202 203 204 205 206 207 208 209 210 211 212
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
H
Hongze Cheng 已提交
213 214 215

  pBuf= pBlocks->pData + pBlocks->size;

H
Hongze Cheng 已提交
216
  tEncoderInit(&coder, pBuf, len);
H
Hongze Cheng 已提交
217
  tEncodeSVCreateTbReq(&coder, pCreateTbReq);
H
Hongze Cheng 已提交
218
  tEncoderClear(&coder);
H
Hongze Cheng 已提交
219

X
Xiaoyu Wang 已提交
220 221 222 223 224
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
225
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize,
wmmhello's avatar
wmmhello 已提交
226
                             STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
X
Xiaoyu Wang 已提交
227
                             SVCreateTbReq* pCreateTbReq) {
228
  *dataBlocks = NULL;
D
dapan 已提交
229
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
230 231 232 233 234
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
235
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
236 237 238 239
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
Hongze Cheng 已提交
240
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
X
Xiaoyu Wang 已提交
241 242 243 244 245 246
      ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    }

D
dapan 已提交
247
    taosHashPut(pHashList, (const char*)id, idLen, (char*)dataBlocks, POINTER_BYTES);
248 249 250 251 252 253 254 255 256
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t getRowExpandSize(STableMeta* pTableMeta) {
C
Cary Xu 已提交
257
  int32_t  result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
258 259
  int32_t  columns = getNumOfColumns(pTableMeta);
  SSchema* pSchema = getTableColumnSchema(pTableMeta);
C
Cary Xu 已提交
260
  for (int32_t i = 0; i < columns; ++i) {
261 262 263 264
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
C
Cary Xu 已提交
265
  result += (int32_t)TD_BITMAP_BYTES(columns - 1);
266 267 268
  return result;
}

269
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
270 271 272 273
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
274
  taosMemoryFreeClear(pDataBlock->pData);
D
dapan1121 已提交
275
//  if (!pDataBlock->cloned) {
276
    // free the refcount for metermeta
wmmhello's avatar
wmmhello 已提交
277
    taosMemoryFreeClear(pDataBlock->pTableMeta);
278 279

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
D
dapan1121 已提交
280
//  }
wafwerar's avatar
wafwerar 已提交
281
  taosMemoryFreeClear(pDataBlock);
282 283
}

284
void destroyBlockArrayList(SArray* pDataBlockList) {
285
  if (pDataBlockList == NULL) {
286
    return;
287 288 289 290
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
291 292
    void* p = taosArrayGetP(pDataBlockList, i);
    destroyDataBlock(p);
293 294 295 296 297
  }

  taosArrayDestroy(pDataBlockList);
}

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
    destroyDataBlock(pBlocks);

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

314
// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
315 316
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
317 318 319 320 321

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);

  if (!dataBuf->ordered) {
X
Xiaoyu Wang 已提交
322
    char* pBlockData = pBlocks->data;
323 324

    // todo. qsort is unstable, if timestamp is same, should get the last one
wafwerar's avatar
wafwerar 已提交
325
    taosSort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
326 327 328 329

    int32_t i = 0;
    int32_t j = 1;

330
    // delete rows with timestamp conflicts
331
    while (j < pBlocks->numOfRows) {
X
Xiaoyu Wang 已提交
332 333
      TSKEY ti = *(TSKEY*)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY*)(pBlockData + dataBuf->rowSize * j);
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;

    pBlocks->numOfRows = i + 1;
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
  }

  dataBuf->prevTS = INT64_MIN;
}

// data block is disordered, sort it in ascending order
C
Cary Xu 已提交
358
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
X
Xiaoyu Wang 已提交
359
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
360 361 362 363 364 365 366
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
X
Xiaoyu Wang 已提交
367
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
368 369 370
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
371
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
372 373 374 375 376
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
X
Xiaoyu Wang 已提交
377 378
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
379 380
  int             n = 0;
  while (n < nRows) {
X
Xiaoyu Wang 已提交
381
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
382
    pBlkKeyTuple->payloadAddr = pBlockData;
wafwerar's avatar
wafwerar 已提交
383
    pBlkKeyTuple->index = n;
384 385 386 387 388 389 390 391 392

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
393 394

    // todo. qsort is unstable, if timestamp is same, should get the last one
wafwerar's avatar
wafwerar 已提交
395
    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

C
Cary Xu 已提交
426 427 428 429 430 431 432 433
static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) {
  if (pBlkRowMerger && (pBlkRowMerger->index >= 0)) {
    ASSERT(pBlkRowMerger->index < taosArrayGetSize(pBlkRowMerger->rowArray));
    return *(void**)taosArrayGet(pBlkRowMerger->rowArray, pBlkRowMerger->index);
  }
  return NULL;
}

C
Cary Xu 已提交
434
static int32_t tdBlockRowMerge(STableMeta* pTableMeta, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows,
C
Cary Xu 已提交
435 436 437 438 439 440 441
                               SBlockRowMerger** pBlkRowMerger, int32_t rowSize) {
  ASSERT(nDupRows > 1);
  SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1);
  ASSERT(pStartKeyTp->skey == pEndKeyTp->skey);

  // TODO: optimization if end row is all normal
#if 0
C
Cary Xu 已提交
442
  STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr;
C
Cary Xu 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
  if(isNormal(pEndRow)) { // set the end row if it is normal and return directly
    pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr;
    return TSDB_CODE_SUCCESS;
  }
#endif

  if (!(*pBlkRowMerger)) {
    (*pBlkRowMerger) = taosMemoryCalloc(1, sizeof(**pBlkRowMerger));
    if (!(*pBlkRowMerger)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
    (*pBlkRowMerger)->index = -1;
    if (!(*pBlkRowMerger)->rowArray) {
      (*pBlkRowMerger)->rowArray = taosArrayInit(1, sizeof(void*));
      if (!(*pBlkRowMerger)->rowArray) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
    }
  }

C
Cary Xu 已提交
465 466 467 468 469 470 471 472 473 474
  if ((*pBlkRowMerger)->pSchema) {
    if ((*pBlkRowMerger)->pSchema->version != pTableMeta->sversion) {
      taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
    } else {
      if ((*pBlkRowMerger)->tbUid != (pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid)) {
        taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
      }
    }
  }

C
Cary Xu 已提交
475
  if (!(*pBlkRowMerger)->pSchema) {
C
Cary Xu 已提交
476 477
    (*pBlkRowMerger)->pSchema =
        tdGetSTSChemaFromSSChema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
C
Cary Xu 已提交
478 479 480 481 482

    if (!(*pBlkRowMerger)->pSchema) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
483
    (*pBlkRowMerger)->tbUid = pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid;
C
Cary Xu 已提交
484 485 486 487 488
  }

  void* pDestRow = NULL;
  ++((*pBlkRowMerger)->index);
  if ((*pBlkRowMerger)->index < taosArrayGetSize((*pBlkRowMerger)->rowArray)) {
489 490
    void** pAlloc = (void**)taosArrayGet((*pBlkRowMerger)->rowArray, (*pBlkRowMerger)->index);
    if (tRealloc((uint8_t**)pAlloc, rowSize) != 0) {
C
Cary Xu 已提交
491 492
      return TSDB_CODE_FAILED;
    }
493
    pDestRow = *pAlloc;
C
Cary Xu 已提交
494 495 496 497 498 499 500 501 502 503 504 505
  } else {
    if (tRealloc((uint8_t**)&pDestRow, rowSize) != 0) {
      return TSDB_CODE_FAILED;
    }
    taosArrayPush((*pBlkRowMerger)->rowArray, &pDestRow);
  }

  // merge rows to pDestRow
  STSchema* pSchema = (*pBlkRowMerger)->pSchema;
  SArray*   pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
  for (int32_t i = 0; i < pSchema->numOfCols; ++i) {
    SColVal colVal = {0};
C
Cary Xu 已提交
506
    for (int32_t j = 0; j < nDupRows; ++j) {
C
Cary Xu 已提交
507
      tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal);
H
Haojun Liao 已提交
508
      if (!COL_VAL_IS_NONE(&colVal)) {
C
Cary Xu 已提交
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
        break;
      }
    }
    taosArrayPush(pArray, &colVal);
  }
  if (tdSTSRowNew(pArray, pSchema, (STSRow**)&pDestRow) < 0) {
    taosArrayDestroy(pArray);
    return TSDB_CODE_FAILED;
  }

  taosArrayDestroy(pArray);
  return TSDB_CODE_SUCCESS;
}

// data block is disordered, sort it in ascending order, and merge dup rows if exists
static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo,
                                     SBlockRowMerger** ppBlkRowMerger) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
C
Cary Xu 已提交
527
  STableMeta* pTableMeta = dataBuf->pTableMeta;
528
  int32_t     nRows = pBlocks->numOfRows;
C
Cary Xu 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548

  // size is less than the total size, since duplicated rows may be removed.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  tdResetSBlockRowMerger(*ppBlkRowMerger);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
549
  int32_t         n = 0;
C
Cary Xu 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
  while (n < nRows) {
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
    pBlkKeyTuple->payloadAddr = pBlockData;
    pBlkKeyTuple->index = n;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;

    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    bool    hasDup = false;
    int32_t nextPos = 0;
    int32_t i = 0;
    int32_t j = 1;

    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      if ((j - i) > 1) {
C
Cary Xu 已提交
582
        if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
583 584 585
          return TSDB_CODE_FAILED;
        }
        (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
C
Cary Xu 已提交
586 587 588
        if (!hasDup) {
          hasDup = true;
        }
C
Cary Xu 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602
        i = j;
      } else {
        if (hasDup) {
          memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
        }
        ++i;
      }

      ++nextPos;
      ++j;
    }

    if ((j - i) > 1) {
      ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey);
C
Cary Xu 已提交
603
      if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
604 605 606 607 608 609 610 611
        return TSDB_CODE_FAILED;
      }
      (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
    } else if (hasDup) {
      memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
    }

    dataBuf->ordered = true;
C
Cary Xu 已提交
612
    pBlocks->numOfRows = nextPos + 1;
C
Cary Xu 已提交
613 614 615 616 617 618 619 620
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return TSDB_CODE_SUCCESS;
}

621
// Erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
622 623
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
                         bool isRawPayload) {
624
  // TODO: optimize this function, handle the case while binary is not presented
X
Xiaoyu Wang 已提交
625 626 627
  STableMeta*   pTableMeta = pTableDataBlock->pTableMeta;
  STableComInfo tinfo = getTableInfo(pTableMeta);
  SSchema*      pSchema = getTableColumnSchema(pTableMeta);
628

X
Xiaoyu Wang 已提交
629
  int32_t     nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
630
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
631 632
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
633 634

  int32_t flen = 0;  // original total length of row
X
Xiaoyu Wang 已提交
635 636
  if (isRawPayload) {
    for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
637 638 639
      flen += TYPE_BYTES[pSchema[j].type];
    }
  }
X
Xiaoyu Wang 已提交
640
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
641

X
Xiaoyu Wang 已提交
642
  char* p = pTableDataBlock->pData + nonDataLen;
643
  pBlock->dataLen = 0;
644
  int32_t numOfRows = pBlock->numOfRows;
645 646

  if (isRawPayload) {
C
Cary Xu 已提交
647
    SRowBuilder builder = {0};
X
Xiaoyu Wang 已提交
648

C
Cary Xu 已提交
649 650
    tdSRowInit(&builder, pTableMeta->sversion);
    tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
651

C
Cary Xu 已提交
652 653
    for (int32_t i = 0; i < numOfRows; ++i) {
      tdSRowResetBuf(&builder, pDataBlock);
654
      int toffset = 0;
C
Cary Xu 已提交
655 656 657 658 659
      for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
        int8_t  colType = pSchema[j].type;
        uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
        tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
        toffset += TYPE_BYTES[colType];
660 661
        p += pSchema[j].bytes;
      }
662
      tdSRowEnd(&builder);
C
Cary Xu 已提交
663 664 665
      int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
      pDataBlock = (char*)pDataBlock + rowLen;
      pBlock->dataLen += rowLen;
666 667 668
    }
  } else {
    for (int32_t i = 0; i < numOfRows; ++i) {
X
Xiaoyu Wang 已提交
669 670
      char*     payload = (blkKeyTuple + i)->payloadAddr;
      TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
C
Cary Xu 已提交
671 672 673
      memcpy(pDataBlock, payload, rowTLen);
      pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
      pBlock->dataLen += rowTLen;
674 675 676
    }
  }

677
  return pBlock->dataLen + pBlock->schemaLen;
678 679
}

X
Xiaoyu Wang 已提交
680
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
681
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
682 683
  int       code = 0;
  bool      isRawPayload = IS_RAW_PAYLOAD(payloadType);
D
dapan 已提交
684
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
685 686 687
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
X
Xiaoyu Wang 已提交
688 689
  STableDataBlocks*  pOneTableBlock = *p;
  SBlockKeyInfo      blkKeyInfo = {0};  // share by pOneTableBlock
C
Cary Xu 已提交
690 691
  SBlockRowMerger    *pBlkRowMerger = NULL;

692
  while (pOneTableBlock) {
X
Xiaoyu Wang 已提交
693
    SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
694 695
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
696
      pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId;    // for schemaless, restore origin vgId
X
Xiaoyu Wang 已提交
697
      int32_t           ret =
D
dapan 已提交
698
          getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
X
Xiaoyu Wang 已提交
699
                               pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
700
      if (ret != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
701
        tdFreeSBlockRowMerger(pBlkRowMerger);
702 703
        taosHashCleanup(pVnodeDataBlockHashList);
        destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
704
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
705 706
        return ret;
      }
X
Xiaoyu Wang 已提交
707
      ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
708
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
709
      int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
710
      int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
D
dapan 已提交
711
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) + pOneTableBlock->createTbReqLen;
712 713 714

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
715
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
716 717 718
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
C
Cary Xu 已提交
719
          tdFreeSBlockRowMerger(pBlkRowMerger);
720 721
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
722 723
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
724 725 726 727 728 729 730
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

      if (isRawPayload) {
        sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
      } else {
C
Cary Xu 已提交
731 732
        if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
          tdFreeSBlockRowMerger(pBlkRowMerger);
733 734
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
735 736
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
737 738 739 740 741 742
          return code;
        }
        ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
      }

      // erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
743 744
      int32_t finalLen =
          trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
C
Cary Xu 已提交
760
  tdFreeSBlockRowMerger(pBlkRowMerger);
761
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
762
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
763
  *pVgDataBlocks = pVnodeDataBlockList;
764 765 766
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
767 768
int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
  size_t   remain = pDataBlock->nAllocSize - pDataBlock->size;
D
stmt  
dapan1121 已提交
769
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
X
Xiaoyu Wang 已提交
770

D
stmt  
dapan1121 已提交
771 772 773 774
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

X
Xiaoyu Wang 已提交
775
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
D
stmt  
dapan1121 已提交
776 777 778 779 780 781 782 783 784 785 786 787 788
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
789
int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) {
790 791
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
  const int factor = 5;
X
Xiaoyu Wang 已提交
792 793
  uint32_t  nAllocSizeOld = pDataBlock->nAllocSize;

794 795 796 797 798 799 800
  // expand the allocated size
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }

X
Xiaoyu Wang 已提交
801
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
817
int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
C
Cary Xu 已提交
818 819 820 821
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
822 823
  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
824

D
stmt  
dapan1121 已提交
825
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
D
stmt  
dapan1121 已提交
826
  STableDataBlocks* pBlock = (STableDataBlocks*)block;
D
stmt  
dapan1121 已提交
827

D
stmt  
dapan1121 已提交
828 829 830 831 832 833 834 835
  if (keepBuf) {
    taosMemoryFreeClear(pBlock->pData);
    pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
    if (NULL == pBlock->pData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memset(pBlock->pData, 0, sizeof(SSubmitBlk));
  } else {
X
Xiaoyu Wang 已提交
836
    pBlock->pData = NULL;
D
stmt  
dapan1121 已提交
837
  }
X
Xiaoyu Wang 已提交
838 839 840 841

  pBlock->ordered = true;
  pBlock->prevTS = INT64_MIN;
  pBlock->size = sizeof(SSubmitBlk);
D
stmt  
dapan1121 已提交
842 843 844 845
  pBlock->tsSource = -1;
  pBlock->numOfTables = 1;
  pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
  pBlock->headerSize = pBlock->size;
D
dapan 已提交
846
  pBlock->createTbReqLen = 0;
X
Xiaoyu Wang 已提交
847

D
stmt  
dapan1121 已提交
848
  memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
D
stmt  
dapan1121 已提交
849 850

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
851 852
}

D
stmt  
dapan1121 已提交
853 854 855 856
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
  *pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
857
  }
X
Xiaoyu Wang 已提交
858

D
stmt  
dapan1121 已提交
859 860
  memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
  ((STableDataBlocks*)(*pDst))->cloned = true;
X
Xiaoyu Wang 已提交
861

D
dapan1121 已提交
862 863 864 865 866 867 868 869 870 871 872
  STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst);
  if (pBlock->pTableMeta) {
    void *pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta));
    if (NULL == pNewMeta) {
      taosMemoryFreeClear(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta));
    pBlock->pTableMeta = pNewMeta;
  }

D
stmt  
dapan1121 已提交
873
  return qResetStmtDataBlock(*pDst, false);
D
stmt  
dapan1121 已提交
874 875
}

D
dapan 已提交
876
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
D
stmt  
dapan1121 已提交
877 878 879
  int32_t code = qCloneStmtDataBlock(pDst, pSrc);
  if (code) {
    return code;
D
stmt  
dapan1121 已提交
880 881
  }

X
Xiaoyu Wang 已提交
882
  STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
D
stmt  
dapan1121 已提交
883 884 885 886
  pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
  if (NULL == pBlock->pData) {
    qFreeStmtDataBlock(pBlock);
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
887 888
  }

D
dapan 已提交
889 890
  pBlock->vgId = vgId;
  
D
dapan 已提交
891 892
  if (pBlock->pTableMeta) {
    pBlock->pTableMeta->uid = uid;
D
dapan 已提交
893
    pBlock->pTableMeta->vgId = vgId;
D
dapan 已提交
894 895
  }
  
D
stmt  
dapan1121 已提交
896 897
  memset(pBlock->pData, 0, sizeof(SSubmitBlk));

D
stmt  
dapan1121 已提交
898 899 900
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
901 902 903 904
STableMeta *qGetTableMetaInDataBlock(void* pDataBlock) {
  return ((STableDataBlocks*)pDataBlock)->pTableMeta;
}

D
stmt  
dapan1121 已提交
905 906 907
void qFreeStmtDataBlock(void* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
908 909
  }

D
dapan1121 已提交
910
  taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta);
D
stmt  
dapan1121 已提交
911 912
  taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
  taosMemoryFreeClear(pDataBlock);
D
stmt  
dapan1121 已提交
913 914
}

D
stmt  
dapan1121 已提交
915 916 917
void qDestroyStmtDataBlock(void* pBlock) {
  if (pBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
918 919
  }

D
stmt  
dapan1121 已提交
920
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
921

D
stmt  
dapan1121 已提交
922 923 924
  pDataBlock->cloned = false;
  destroyDataBlock(pDataBlock);
}