parInsertData.c 20.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "parInsertData.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parInt.h"
X
Xiaoyu Wang 已提交
20
#include "parUtil.h"
X
Xiaoyu Wang 已提交
21
#include "querynodes.h"
22 23 24 25

#define IS_RAW_PAYLOAD(t) \
  (((int)(t)) == PAYLOAD_TYPE_RAW)  // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert

26 27 28 29 30 31 32 33 34 35
typedef struct SBlockKeyTuple {
  TSKEY skey;
  void* payloadAddr;
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

X
Xiaoyu Wang 已提交
36 37 38
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
39 40 41 42 43 44 45 46

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

47
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
48 49 50
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
51
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
52
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    pColList->flen += TYPE_BYTES[type];
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
77
    pColList->boundColumns[i] = pSchema[i].colId;
78 79
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
80
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
81 82 83
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

X
Xiaoyu Wang 已提交
84 85 86
int32_t schemaIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)lhs;
  uint16_t right = *(uint16_t*)rhs;
87 88 89 90 91 92 93 94

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
95 96 97
int32_t boundIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
98 99 100 101 102 103 104 105

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

D
stmt  
dapan1121 已提交
106 107
void destroyBoundColumnInfo(void* pBoundInfo) {
  if (NULL == pBoundInfo) {
D
stmt  
dapan1121 已提交
108 109
    return;
  }
D
stmt  
dapan1121 已提交
110 111

  SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
X
Xiaoyu Wang 已提交
112

113
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
114 115
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
116 117
}

X
Xiaoyu Wang 已提交
118 119
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, const STableMeta* pTableMeta,
                               STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
120
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
121 122 123 124 125 126 127 128 129 130 131 132
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
133
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
134
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
135
    taosMemoryFreeClear(dataBuf);
136 137 138 139
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

X
Xiaoyu Wang 已提交
140
  // Here we keep the tableMeta to avoid it to be remove by other threads.
141 142 143
  dataBuf->pTableMeta = tableMetaDup(pTableMeta);

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
X
Xiaoyu Wang 已提交
144
  SSchema*            pSchema = getTableColumnSchema(dataBuf->pTableMeta);
145 146
  setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);

X
Xiaoyu Wang 已提交
147 148 149 150 151
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
  dataBuf->vgId = dataBuf->pTableMeta->vgId;
152 153 154 155 156 157 158

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
159
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
X
Xiaoyu Wang 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
  int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq);
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
  char* pBuf = pBlocks->pData + pBlocks->size;
  tSerializeSVCreateTbReq((void**)&pBuf, pCreateTbReq);
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
  return TSDB_CODE_SUCCESS;
}

179
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
X
Xiaoyu Wang 已提交
180 181
                             const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
                             SVCreateTbReq* pCreateTbReq) {
182 183 184 185 186 187 188
  *dataBlocks = NULL;
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
189
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
190 191 192 193
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

X
Xiaoyu Wang 已提交
194 195 196 197 198 199 200
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctbCfg.pTag) {
      ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    }

201 202 203 204 205 206 207 208 209 210
    taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t getRowExpandSize(STableMeta* pTableMeta) {
C
Cary Xu 已提交
211
  int32_t  result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
212 213
  int32_t  columns = getNumOfColumns(pTableMeta);
  SSchema* pSchema = getTableColumnSchema(pTableMeta);
C
Cary Xu 已提交
214
  for (int32_t i = 0; i < columns; ++i) {
215 216 217 218
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
C
Cary Xu 已提交
219
  result += (int32_t)TD_BITMAP_BYTES(columns - 1);
220 221 222
  return result;
}

223
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
224 225 226 227
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
228
  taosMemoryFreeClear(pDataBlock->pData);
229 230 231
  if (!pDataBlock->cloned) {
    // free the refcount for metermeta
    if (pDataBlock->pTableMeta != NULL) {
wafwerar's avatar
wafwerar 已提交
232
      taosMemoryFreeClear(pDataBlock->pTableMeta);
233 234 235 236
    }

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  }
wafwerar's avatar
wafwerar 已提交
237
  taosMemoryFreeClear(pDataBlock);
238 239
}

240
void destroyBlockArrayList(SArray* pDataBlockList) {
241
  if (pDataBlockList == NULL) {
242
    return;
243 244 245 246
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
247 248
    void* p = taosArrayGetP(pDataBlockList, i);
    destroyDataBlock(p);
249 250 251 252 253
  }

  taosArrayDestroy(pDataBlockList);
}

254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
    destroyDataBlock(pBlocks);

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

270
// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
271 272
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
273 274 275 276 277

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);

  if (!dataBuf->ordered) {
X
Xiaoyu Wang 已提交
278
    char* pBlockData = pBlocks->data;
279 280 281 282 283
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);

    int32_t i = 0;
    int32_t j = 1;

284
    // delete rows with timestamp conflicts
285
    while (j < pBlocks->numOfRows) {
X
Xiaoyu Wang 已提交
286 287
      TSKEY ti = *(TSKEY*)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY*)(pBlockData + dataBuf->rowSize * j);
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;

    pBlocks->numOfRows = i + 1;
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
  }

  dataBuf->prevTS = INT64_MIN;
}

// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
312 313
int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
314 315 316 317 318 319 320
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
X
Xiaoyu Wang 已提交
321
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
322 323 324
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
325
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
326 327 328 329 330
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
X
Xiaoyu Wang 已提交
331 332
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
333 334
  int             n = 0;
  while (n < nRows) {
X
Xiaoyu Wang 已提交
335
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
    pBlkKeyTuple->payloadAddr = pBlockData;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

// Erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
378 379
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
                         bool isRawPayload) {
380
  // TODO: optimize this function, handle the case while binary is not presented
X
Xiaoyu Wang 已提交
381 382 383
  STableMeta*   pTableMeta = pTableDataBlock->pTableMeta;
  STableComInfo tinfo = getTableInfo(pTableMeta);
  SSchema*      pSchema = getTableColumnSchema(pTableMeta);
384

X
Xiaoyu Wang 已提交
385
  int32_t     nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
386
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
387 388
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
389 390

  int32_t flen = 0;  // original total length of row
X
Xiaoyu Wang 已提交
391 392
  if (isRawPayload) {
    for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
393 394 395
      flen += TYPE_BYTES[pSchema[j].type];
    }
  }
X
Xiaoyu Wang 已提交
396
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
397

X
Xiaoyu Wang 已提交
398
  char* p = pTableDataBlock->pData + nonDataLen;
399
  pBlock->dataLen = 0;
400
  int32_t numOfRows = pBlock->numOfRows;
401 402

  if (isRawPayload) {
C
Cary Xu 已提交
403
    SRowBuilder builder = {0};
X
Xiaoyu Wang 已提交
404

C
Cary Xu 已提交
405 406
    tdSRowInit(&builder, pTableMeta->sversion);
    tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
407

C
Cary Xu 已提交
408 409
    for (int32_t i = 0; i < numOfRows; ++i) {
      tdSRowResetBuf(&builder, pDataBlock);
410
      int toffset = 0;
C
Cary Xu 已提交
411 412 413 414 415
      for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
        int8_t  colType = pSchema[j].type;
        uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
        tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
        toffset += TYPE_BYTES[colType];
416 417
        p += pSchema[j].bytes;
      }
C
Cary Xu 已提交
418 419 420
      int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
      pDataBlock = (char*)pDataBlock + rowLen;
      pBlock->dataLen += rowLen;
421 422 423
    }
  } else {
    for (int32_t i = 0; i < numOfRows; ++i) {
X
Xiaoyu Wang 已提交
424 425
      char*     payload = (blkKeyTuple + i)->payloadAddr;
      TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
C
Cary Xu 已提交
426 427 428
      memcpy(pDataBlock, payload, rowTLen);
      pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
      pBlock->dataLen += rowTLen;
429 430 431
    }
  }

432
  return pBlock->dataLen + pBlock->schemaLen;
433 434
}

X
Xiaoyu Wang 已提交
435
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
436
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
437 438
  int       code = 0;
  bool      isRawPayload = IS_RAW_PAYLOAD(payloadType);
439
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
440 441 442
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
X
Xiaoyu Wang 已提交
443 444
  STableDataBlocks*  pOneTableBlock = *p;
  SBlockKeyInfo      blkKeyInfo = {0};  // share by pOneTableBlock
445
  while (pOneTableBlock) {
X
Xiaoyu Wang 已提交
446
    SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
447 448
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
X
Xiaoyu Wang 已提交
449 450 451
      int32_t           ret =
          getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
                               pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
452 453 454
      if (ret != TSDB_CODE_SUCCESS) {
        taosHashCleanup(pVnodeDataBlockHashList);
        destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
455
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
456 457 458
        return ret;
      }

459
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
460
      int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
461 462 463 464 465
      int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
466
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
467 468 469 470 471
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
472 473
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
474 475 476 477 478 479 480 481 482 483
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

      if (isRawPayload) {
        sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
      } else {
        if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
484 485
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
486 487 488 489 490 491 492 493 494 495
          return code;
        }
        ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
      }

      int32_t len = pBlocks->numOfRows *
                        (isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
                    sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);

      // erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
496 497
      int32_t finalLen =
          trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
      assert(finalLen <= len);

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
515
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
516
  *pVgDataBlocks = pVnodeDataBlockList;
517 518 519
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
520 521
int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
  size_t   remain = pDataBlock->nAllocSize - pDataBlock->size;
D
stmt  
dapan1121 已提交
522
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
X
Xiaoyu Wang 已提交
523

D
stmt  
dapan1121 已提交
524 525 526 527
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

X
Xiaoyu Wang 已提交
528
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
D
stmt  
dapan1121 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
542
int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) {
543 544
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
  const int factor = 5;
X
Xiaoyu Wang 已提交
545 546
  uint32_t  nAllocSizeOld = pDataBlock->nAllocSize;

547 548 549 550 551 552 553
  // expand the allocated size
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }

X
Xiaoyu Wang 已提交
554
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
570
int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
C
Cary Xu 已提交
571 572 573 574
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
575 576
  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
577

D
stmt  
dapan1121 已提交
578
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
D
stmt  
dapan1121 已提交
579
  STableDataBlocks* pBlock = (STableDataBlocks*)block;
D
stmt  
dapan1121 已提交
580

D
stmt  
dapan1121 已提交
581 582 583 584 585 586 587 588
  if (keepBuf) {
    taosMemoryFreeClear(pBlock->pData);
    pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
    if (NULL == pBlock->pData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memset(pBlock->pData, 0, sizeof(SSubmitBlk));
  } else {
X
Xiaoyu Wang 已提交
589
    pBlock->pData = NULL;
D
stmt  
dapan1121 已提交
590
  }
X
Xiaoyu Wang 已提交
591 592 593 594

  pBlock->ordered = true;
  pBlock->prevTS = INT64_MIN;
  pBlock->size = sizeof(SSubmitBlk);
D
stmt  
dapan1121 已提交
595 596 597 598
  pBlock->tsSource = -1;
  pBlock->numOfTables = 1;
  pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
  pBlock->headerSize = pBlock->size;
X
Xiaoyu Wang 已提交
599

D
stmt  
dapan1121 已提交
600
  memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
D
stmt  
dapan1121 已提交
601 602

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
603 604
}

D
stmt  
dapan1121 已提交
605 606 607 608
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
  *pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
609
  }
X
Xiaoyu Wang 已提交
610

D
stmt  
dapan1121 已提交
611 612
  memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
  ((STableDataBlocks*)(*pDst))->cloned = true;
X
Xiaoyu Wang 已提交
613

D
stmt  
dapan1121 已提交
614
  return qResetStmtDataBlock(*pDst, false);
D
stmt  
dapan1121 已提交
615 616
}

D
stmt  
dapan1121 已提交
617 618 619 620
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
  int32_t code = qCloneStmtDataBlock(pDst, pSrc);
  if (code) {
    return code;
D
stmt  
dapan1121 已提交
621 622
  }

X
Xiaoyu Wang 已提交
623
  STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
D
stmt  
dapan1121 已提交
624 625 626 627
  pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
  if (NULL == pBlock->pData) {
    qFreeStmtDataBlock(pBlock);
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
628 629
  }

D
stmt  
dapan1121 已提交
630 631
  memset(pBlock->pData, 0, sizeof(SSubmitBlk));

D
stmt  
dapan1121 已提交
632 633 634
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
635 636 637
void qFreeStmtDataBlock(void* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
638 639
  }

D
stmt  
dapan1121 已提交
640 641
  taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
  taosMemoryFreeClear(pDataBlock);
D
stmt  
dapan1121 已提交
642 643
}

D
stmt  
dapan1121 已提交
644 645 646
void qDestroyStmtDataBlock(void* pBlock) {
  if (pBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
647 648
  }

D
stmt  
dapan1121 已提交
649
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
650

D
stmt  
dapan1121 已提交
651 652 653
  pDataBlock->cloned = false;
  destroyDataBlock(pDataBlock);
}