parInsertData.c 20.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15
// clang-format off
X
Xiaoyu Wang 已提交
16
#include "parInsertData.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parInt.h"
X
Xiaoyu Wang 已提交
20
#include "parUtil.h"
X
Xiaoyu Wang 已提交
21
#include "querynodes.h"
22 23 24 25

#define IS_RAW_PAYLOAD(t) \
  (((int)(t)) == PAYLOAD_TYPE_RAW)  // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert

26 27 28 29 30 31 32 33 34 35
typedef struct SBlockKeyTuple {
  TSKEY skey;
  void* payloadAddr;
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

X
Xiaoyu Wang 已提交
36 37 38
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
39 40 41 42 43 44 45 46

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

47
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
48 49 50
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
51
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
52
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    pColList->flen += TYPE_BYTES[type];
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
77
    pColList->boundColumns[i] = pSchema[i].colId;
78 79
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
80
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
81 82 83
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

X
Xiaoyu Wang 已提交
84 85 86
int32_t schemaIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)lhs;
  uint16_t right = *(uint16_t*)rhs;
87 88 89 90 91 92 93 94

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
95 96 97
int32_t boundIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
98 99 100 101 102 103 104 105

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

D
stmt  
dapan1121 已提交
106 107
void destroyBoundColumnInfo(void* pBoundInfo) {
  if (NULL == pBoundInfo) {
D
stmt  
dapan1121 已提交
108 109
    return;
  }
D
stmt  
dapan1121 已提交
110 111

  SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
X
Xiaoyu Wang 已提交
112

113
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
114 115
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
116 117
}

wmmhello's avatar
wmmhello 已提交
118
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
X
Xiaoyu Wang 已提交
119
                               STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
120
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
121 122 123 124 125 126 127 128 129 130 131 132
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
133
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
134
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
135
    taosMemoryFreeClear(dataBuf);
136 137 138 139
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

wmmhello's avatar
wmmhello 已提交
140
  dataBuf->pTableMeta = pTableMeta;
141 142

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
X
Xiaoyu Wang 已提交
143
  SSchema*            pSchema = getTableColumnSchema(dataBuf->pTableMeta);
144 145
  setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);

X
Xiaoyu Wang 已提交
146 147 148 149 150
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
  dataBuf->vgId = dataBuf->pTableMeta->vgId;
151 152 153 154 155 156 157

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
158
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
H
Hongze Cheng 已提交
159 160
  SCoder coder = {0};
  char* pBuf;
wafwerar's avatar
wafwerar 已提交
161
  int32_t len;
H
Hongze Cheng 已提交
162

wafwerar's avatar
wafwerar 已提交
163 164
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, ret);
X
Xiaoyu Wang 已提交
165 166 167 168 169 170 171 172 173 174 175
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
H
Hongze Cheng 已提交
176 177 178 179 180 181 182

  pBuf= pBlocks->pData + pBlocks->size;

  tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, len, TD_ENCODER);
  tEncodeSVCreateTbReq(&coder, pCreateTbReq);
  tCoderClear(&coder);

X
Xiaoyu Wang 已提交
183 184 185 186 187
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
  return TSDB_CODE_SUCCESS;
}

188
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
wmmhello's avatar
wmmhello 已提交
189
                             STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
X
Xiaoyu Wang 已提交
190
                             SVCreateTbReq* pCreateTbReq) {
191 192 193 194 195 196 197
  *dataBlocks = NULL;
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
198
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
199 200 201 202
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
Hongze Cheng 已提交
203
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
X
Xiaoyu Wang 已提交
204 205 206 207 208 209
      ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    }

210 211 212 213 214 215 216 217 218 219
    taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t getRowExpandSize(STableMeta* pTableMeta) {
C
Cary Xu 已提交
220
  int32_t  result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
221 222
  int32_t  columns = getNumOfColumns(pTableMeta);
  SSchema* pSchema = getTableColumnSchema(pTableMeta);
C
Cary Xu 已提交
223
  for (int32_t i = 0; i < columns; ++i) {
224 225 226 227
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
C
Cary Xu 已提交
228
  result += (int32_t)TD_BITMAP_BYTES(columns - 1);
229 230 231
  return result;
}

232
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
233 234 235 236
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
237
  taosMemoryFreeClear(pDataBlock->pData);
238 239
  if (!pDataBlock->cloned) {
    // free the refcount for metermeta
wmmhello's avatar
wmmhello 已提交
240 241 242
//    if (pDataBlock->pTableMeta != NULL) {
//      taosMemoryFreeClear(pDataBlock->pTableMeta);
//    }
243 244 245

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  }
wafwerar's avatar
wafwerar 已提交
246
  taosMemoryFreeClear(pDataBlock);
247 248
}

249
void destroyBlockArrayList(SArray* pDataBlockList) {
250
  if (pDataBlockList == NULL) {
251
    return;
252 253 254 255
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
256 257
    void* p = taosArrayGetP(pDataBlockList, i);
    destroyDataBlock(p);
258 259 260 261 262
  }

  taosArrayDestroy(pDataBlockList);
}

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
    destroyDataBlock(pBlocks);

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

279
// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
280 281
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
282 283 284 285 286

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);

  if (!dataBuf->ordered) {
X
Xiaoyu Wang 已提交
287
    char* pBlockData = pBlocks->data;
288 289 290 291 292
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);

    int32_t i = 0;
    int32_t j = 1;

293
    // delete rows with timestamp conflicts
294
    while (j < pBlocks->numOfRows) {
X
Xiaoyu Wang 已提交
295 296
      TSKEY ti = *(TSKEY*)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY*)(pBlockData + dataBuf->rowSize * j);
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;

    pBlocks->numOfRows = i + 1;
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
  }

  dataBuf->prevTS = INT64_MIN;
}

// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
321 322
int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
323 324 325 326 327 328 329
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
X
Xiaoyu Wang 已提交
330
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
331 332 333
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
334
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
335 336 337 338 339
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
X
Xiaoyu Wang 已提交
340 341
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
342 343
  int             n = 0;
  while (n < nRows) {
X
Xiaoyu Wang 已提交
344
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
    pBlkKeyTuple->payloadAddr = pBlockData;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

// Erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
387 388
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
                         bool isRawPayload) {
389
  // TODO: optimize this function, handle the case while binary is not presented
X
Xiaoyu Wang 已提交
390 391 392
  STableMeta*   pTableMeta = pTableDataBlock->pTableMeta;
  STableComInfo tinfo = getTableInfo(pTableMeta);
  SSchema*      pSchema = getTableColumnSchema(pTableMeta);
393

X
Xiaoyu Wang 已提交
394
  int32_t     nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
395
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
396 397
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
398 399

  int32_t flen = 0;  // original total length of row
X
Xiaoyu Wang 已提交
400 401
  if (isRawPayload) {
    for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
402 403 404
      flen += TYPE_BYTES[pSchema[j].type];
    }
  }
X
Xiaoyu Wang 已提交
405
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
406

X
Xiaoyu Wang 已提交
407
  char* p = pTableDataBlock->pData + nonDataLen;
408
  pBlock->dataLen = 0;
409
  int32_t numOfRows = pBlock->numOfRows;
410 411

  if (isRawPayload) {
C
Cary Xu 已提交
412
    SRowBuilder builder = {0};
X
Xiaoyu Wang 已提交
413

C
Cary Xu 已提交
414 415
    tdSRowInit(&builder, pTableMeta->sversion);
    tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
416

C
Cary Xu 已提交
417 418
    for (int32_t i = 0; i < numOfRows; ++i) {
      tdSRowResetBuf(&builder, pDataBlock);
419
      int toffset = 0;
C
Cary Xu 已提交
420 421 422 423 424
      for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
        int8_t  colType = pSchema[j].type;
        uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
        tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
        toffset += TYPE_BYTES[colType];
425 426
        p += pSchema[j].bytes;
      }
C
Cary Xu 已提交
427 428 429
      int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
      pDataBlock = (char*)pDataBlock + rowLen;
      pBlock->dataLen += rowLen;
430 431 432
    }
  } else {
    for (int32_t i = 0; i < numOfRows; ++i) {
X
Xiaoyu Wang 已提交
433 434
      char*     payload = (blkKeyTuple + i)->payloadAddr;
      TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
C
Cary Xu 已提交
435 436 437
      memcpy(pDataBlock, payload, rowTLen);
      pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
      pBlock->dataLen += rowTLen;
438 439 440
    }
  }

441
  return pBlock->dataLen + pBlock->schemaLen;
442 443
}

X
Xiaoyu Wang 已提交
444
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
445
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
446 447
  int       code = 0;
  bool      isRawPayload = IS_RAW_PAYLOAD(payloadType);
448
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
449 450 451
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
X
Xiaoyu Wang 已提交
452 453
  STableDataBlocks*  pOneTableBlock = *p;
  SBlockKeyInfo      blkKeyInfo = {0};  // share by pOneTableBlock
454
  while (pOneTableBlock) {
X
Xiaoyu Wang 已提交
455
    SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
456 457
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
X
Xiaoyu Wang 已提交
458 459 460
      int32_t           ret =
          getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
                               pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
461 462 463
      if (ret != TSDB_CODE_SUCCESS) {
        taosHashCleanup(pVnodeDataBlockHashList);
        destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
464
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
465 466 467
        return ret;
      }

468
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
469
      int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
470 471 472 473 474
      int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
475
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
476 477 478 479 480
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
481 482
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
483 484 485 486 487 488 489 490 491 492
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

      if (isRawPayload) {
        sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
      } else {
        if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
493 494
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
495 496 497 498 499 500
          return code;
        }
        ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
      }

      // erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
501 502
      int32_t finalLen =
          trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
519
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
520
  *pVgDataBlocks = pVnodeDataBlockList;
521 522 523
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
524 525
int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
  size_t   remain = pDataBlock->nAllocSize - pDataBlock->size;
D
stmt  
dapan1121 已提交
526
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
X
Xiaoyu Wang 已提交
527

D
stmt  
dapan1121 已提交
528 529 530 531
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

X
Xiaoyu Wang 已提交
532
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
D
stmt  
dapan1121 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
546
int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) {
547 548
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
  const int factor = 5;
X
Xiaoyu Wang 已提交
549 550
  uint32_t  nAllocSizeOld = pDataBlock->nAllocSize;

551 552 553 554 555 556 557
  // expand the allocated size
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }

X
Xiaoyu Wang 已提交
558
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
574
int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
C
Cary Xu 已提交
575 576 577 578
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
579 580
  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
581

D
stmt  
dapan1121 已提交
582
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
D
stmt  
dapan1121 已提交
583
  STableDataBlocks* pBlock = (STableDataBlocks*)block;
D
stmt  
dapan1121 已提交
584

D
stmt  
dapan1121 已提交
585 586 587 588 589 590 591 592
  if (keepBuf) {
    taosMemoryFreeClear(pBlock->pData);
    pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
    if (NULL == pBlock->pData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memset(pBlock->pData, 0, sizeof(SSubmitBlk));
  } else {
X
Xiaoyu Wang 已提交
593
    pBlock->pData = NULL;
D
stmt  
dapan1121 已提交
594
  }
X
Xiaoyu Wang 已提交
595 596 597 598

  pBlock->ordered = true;
  pBlock->prevTS = INT64_MIN;
  pBlock->size = sizeof(SSubmitBlk);
D
stmt  
dapan1121 已提交
599 600 601 602
  pBlock->tsSource = -1;
  pBlock->numOfTables = 1;
  pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
  pBlock->headerSize = pBlock->size;
X
Xiaoyu Wang 已提交
603

D
stmt  
dapan1121 已提交
604
  memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
D
stmt  
dapan1121 已提交
605 606

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
607 608
}

D
stmt  
dapan1121 已提交
609 610 611 612
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
  *pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
613
  }
X
Xiaoyu Wang 已提交
614

D
stmt  
dapan1121 已提交
615 616
  memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
  ((STableDataBlocks*)(*pDst))->cloned = true;
X
Xiaoyu Wang 已提交
617

D
stmt  
dapan1121 已提交
618
  return qResetStmtDataBlock(*pDst, false);
D
stmt  
dapan1121 已提交
619 620
}

D
stmt  
dapan1121 已提交
621 622 623 624
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
  int32_t code = qCloneStmtDataBlock(pDst, pSrc);
  if (code) {
    return code;
D
stmt  
dapan1121 已提交
625 626
  }

X
Xiaoyu Wang 已提交
627
  STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
D
stmt  
dapan1121 已提交
628 629 630 631
  pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
  if (NULL == pBlock->pData) {
    qFreeStmtDataBlock(pBlock);
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
632 633
  }

D
stmt  
dapan1121 已提交
634 635
  memset(pBlock->pData, 0, sizeof(SSubmitBlk));

D
stmt  
dapan1121 已提交
636 637 638
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
639 640 641
void qFreeStmtDataBlock(void* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
642 643
  }

D
stmt  
dapan1121 已提交
644 645
  taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
  taosMemoryFreeClear(pDataBlock);
D
stmt  
dapan1121 已提交
646 647
}

D
stmt  
dapan1121 已提交
648 649 650
void qDestroyStmtDataBlock(void* pBlock) {
  if (pBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
651 652
  }

D
stmt  
dapan1121 已提交
653
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
654

D
stmt  
dapan1121 已提交
655 656 657
  pDataBlock->cloned = false;
  destroyDataBlock(pDataBlock);
}