parInsertData.c 21.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15
// clang-format off
X
Xiaoyu Wang 已提交
16
#include "parInsertData.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parInt.h"
X
Xiaoyu Wang 已提交
20
#include "parUtil.h"
X
Xiaoyu Wang 已提交
21
#include "querynodes.h"
22 23 24 25

#define IS_RAW_PAYLOAD(t) \
  (((int)(t)) == PAYLOAD_TYPE_RAW)  // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert

26 27 28 29 30 31 32 33 34 35
typedef struct SBlockKeyTuple {
  TSKEY skey;
  void* payloadAddr;
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

X
Xiaoyu Wang 已提交
36 37 38
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
39 40 41 42 43 44 45 46

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

47
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
48 49 50
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
51
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
52
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    pColList->flen += TYPE_BYTES[type];
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
77
    pColList->boundColumns[i] = i + PRIMARYKEY_TIMESTAMP_COL_ID;
78 79
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
80
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
81 82 83
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

X
Xiaoyu Wang 已提交
84 85 86
int32_t schemaIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)lhs;
  uint16_t right = *(uint16_t*)rhs;
87 88 89 90 91 92 93 94

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
95 96 97
int32_t boundIdxCompar(const void* lhs, const void* rhs) {
  uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
98 99 100 101 102 103 104 105

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

D
stmt  
dapan1121 已提交
106 107
void destroyBoundColumnInfo(void* pBoundInfo) {
  if (NULL == pBoundInfo) {
D
stmt  
dapan1121 已提交
108 109
    return;
  }
D
stmt  
dapan1121 已提交
110 111

  SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
X
Xiaoyu Wang 已提交
112

113
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
114 115
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
116 117
}

wmmhello's avatar
wmmhello 已提交
118
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
X
Xiaoyu Wang 已提交
119
                               STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
120
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
121 122 123 124 125 126 127 128 129 130 131 132
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
133
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
134
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
135
    taosMemoryFreeClear(dataBuf);
136 137 138 139
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

X
Xiaoyu Wang 已提交
140
  dataBuf->pTableMeta = tableMetaDup(pTableMeta);
141 142

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
X
Xiaoyu Wang 已提交
143
  SSchema*            pSchema = getTableColumnSchema(dataBuf->pTableMeta);
144 145
  setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);

X
Xiaoyu Wang 已提交
146 147 148 149 150
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
  dataBuf->vgId = dataBuf->pTableMeta->vgId;
151 152 153 154 155 156 157

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
158
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
H
Hongze Cheng 已提交
159
  SEncoder coder = {0};
H
Hongze Cheng 已提交
160
  char* pBuf;
wafwerar's avatar
wafwerar 已提交
161
  int32_t len;
H
Hongze Cheng 已提交
162

wafwerar's avatar
wafwerar 已提交
163 164
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, ret);
X
Xiaoyu Wang 已提交
165 166 167 168 169 170 171 172 173 174 175
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
H
Hongze Cheng 已提交
176 177 178

  pBuf= pBlocks->pData + pBlocks->size;

H
Hongze Cheng 已提交
179
  tEncoderInit(&coder, pBuf, len);
H
Hongze Cheng 已提交
180
  tEncodeSVCreateTbReq(&coder, pCreateTbReq);
H
Hongze Cheng 已提交
181
  tEncoderClear(&coder);
H
Hongze Cheng 已提交
182

X
Xiaoyu Wang 已提交
183 184 185 186 187
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
188
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize,
wmmhello's avatar
wmmhello 已提交
189
                             STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
X
Xiaoyu Wang 已提交
190
                             SVCreateTbReq* pCreateTbReq) {
191
  *dataBlocks = NULL;
D
dapan 已提交
192
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
193 194 195 196 197
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
198
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
199 200 201 202
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
Hongze Cheng 已提交
203
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
X
Xiaoyu Wang 已提交
204 205 206 207 208 209
      ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    }

D
dapan 已提交
210
    taosHashPut(pHashList, (const char*)id, idLen, (char*)dataBlocks, POINTER_BYTES);
211 212 213 214 215 216 217 218 219
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t getRowExpandSize(STableMeta* pTableMeta) {
C
Cary Xu 已提交
220
  int32_t  result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
221 222
  int32_t  columns = getNumOfColumns(pTableMeta);
  SSchema* pSchema = getTableColumnSchema(pTableMeta);
C
Cary Xu 已提交
223
  for (int32_t i = 0; i < columns; ++i) {
224 225 226 227
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
C
Cary Xu 已提交
228
  result += (int32_t)TD_BITMAP_BYTES(columns - 1);
229 230 231
  return result;
}

232
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
233 234 235 236
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
237
  taosMemoryFreeClear(pDataBlock->pData);
D
dapan1121 已提交
238
//  if (!pDataBlock->cloned) {
239
    // free the refcount for metermeta
wmmhello's avatar
wmmhello 已提交
240
    taosMemoryFreeClear(pDataBlock->pTableMeta);
241 242

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
D
dapan1121 已提交
243
//  }
wafwerar's avatar
wafwerar 已提交
244
  taosMemoryFreeClear(pDataBlock);
245 246
}

247
void destroyBlockArrayList(SArray* pDataBlockList) {
248
  if (pDataBlockList == NULL) {
249
    return;
250 251 252 253
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
254 255
    void* p = taosArrayGetP(pDataBlockList, i);
    destroyDataBlock(p);
256 257 258 259 260
  }

  taosArrayDestroy(pDataBlockList);
}

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
    destroyDataBlock(pBlocks);

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

277
// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
278 279
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
280 281 282 283 284

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);

  if (!dataBuf->ordered) {
X
Xiaoyu Wang 已提交
285
    char* pBlockData = pBlocks->data;
286 287 288 289 290
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);

    int32_t i = 0;
    int32_t j = 1;

291
    // delete rows with timestamp conflicts
292
    while (j < pBlocks->numOfRows) {
X
Xiaoyu Wang 已提交
293 294
      TSKEY ti = *(TSKEY*)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY*)(pBlockData + dataBuf->rowSize * j);
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;

    pBlocks->numOfRows = i + 1;
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
  }

  dataBuf->prevTS = INT64_MIN;
}

// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
319 320
int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
321 322 323 324 325 326 327
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
X
Xiaoyu Wang 已提交
328
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
329 330 331
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
332
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
333 334 335 336 337
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
X
Xiaoyu Wang 已提交
338 339
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
340 341
  int             n = 0;
  while (n < nRows) {
X
Xiaoyu Wang 已提交
342
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    pBlkKeyTuple->payloadAddr = pBlockData;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

// Erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
385 386
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
                         bool isRawPayload) {
387
  // TODO: optimize this function, handle the case while binary is not presented
X
Xiaoyu Wang 已提交
388 389 390
  STableMeta*   pTableMeta = pTableDataBlock->pTableMeta;
  STableComInfo tinfo = getTableInfo(pTableMeta);
  SSchema*      pSchema = getTableColumnSchema(pTableMeta);
391

X
Xiaoyu Wang 已提交
392
  int32_t     nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
393
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
394 395
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
396 397

  int32_t flen = 0;  // original total length of row
X
Xiaoyu Wang 已提交
398 399
  if (isRawPayload) {
    for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
400 401 402
      flen += TYPE_BYTES[pSchema[j].type];
    }
  }
X
Xiaoyu Wang 已提交
403
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
404

X
Xiaoyu Wang 已提交
405
  char* p = pTableDataBlock->pData + nonDataLen;
406
  pBlock->dataLen = 0;
407
  int32_t numOfRows = pBlock->numOfRows;
408 409

  if (isRawPayload) {
C
Cary Xu 已提交
410
    SRowBuilder builder = {0};
X
Xiaoyu Wang 已提交
411

C
Cary Xu 已提交
412 413
    tdSRowInit(&builder, pTableMeta->sversion);
    tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
414

C
Cary Xu 已提交
415 416
    for (int32_t i = 0; i < numOfRows; ++i) {
      tdSRowResetBuf(&builder, pDataBlock);
417
      int toffset = 0;
C
Cary Xu 已提交
418 419 420 421 422
      for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
        int8_t  colType = pSchema[j].type;
        uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
        tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
        toffset += TYPE_BYTES[colType];
423 424
        p += pSchema[j].bytes;
      }
C
Cary Xu 已提交
425 426 427
      int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
      pDataBlock = (char*)pDataBlock + rowLen;
      pBlock->dataLen += rowLen;
428 429 430
    }
  } else {
    for (int32_t i = 0; i < numOfRows; ++i) {
X
Xiaoyu Wang 已提交
431 432
      char*     payload = (blkKeyTuple + i)->payloadAddr;
      TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
C
Cary Xu 已提交
433 434 435
      memcpy(pDataBlock, payload, rowTLen);
      pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
      pBlock->dataLen += rowTLen;
436 437 438
    }
  }

439
  return pBlock->dataLen + pBlock->schemaLen;
440 441
}

X
Xiaoyu Wang 已提交
442
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
443
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
444 445
  int       code = 0;
  bool      isRawPayload = IS_RAW_PAYLOAD(payloadType);
D
dapan 已提交
446
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
447 448 449
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
X
Xiaoyu Wang 已提交
450 451
  STableDataBlocks*  pOneTableBlock = *p;
  SBlockKeyInfo      blkKeyInfo = {0};  // share by pOneTableBlock
452
  while (pOneTableBlock) {
X
Xiaoyu Wang 已提交
453
    SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
454 455
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
456
      pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId;    // for schemaless, restore origin vgId
X
Xiaoyu Wang 已提交
457
      int32_t           ret =
D
dapan 已提交
458
          getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
X
Xiaoyu Wang 已提交
459
                               pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
460 461 462
      if (ret != TSDB_CODE_SUCCESS) {
        taosHashCleanup(pVnodeDataBlockHashList);
        destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
463
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
464 465
        return ret;
      }
X
Xiaoyu Wang 已提交
466
      ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
467
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
468
      int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
469
      int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
D
dapan 已提交
470
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) + pOneTableBlock->createTbReqLen;
471 472 473

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
474
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
475 476 477 478 479
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
480 481
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
482 483 484 485 486 487 488 489 490 491
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

      if (isRawPayload) {
        sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
      } else {
        if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
492 493
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
494 495 496 497 498 499
          return code;
        }
        ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
      }

      // erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
500 501
      int32_t finalLen =
          trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
518
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
519
  *pVgDataBlocks = pVnodeDataBlockList;
520 521 522
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
523 524
int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
  size_t   remain = pDataBlock->nAllocSize - pDataBlock->size;
D
stmt  
dapan1121 已提交
525
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
X
Xiaoyu Wang 已提交
526

D
stmt  
dapan1121 已提交
527 528 529 530
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

X
Xiaoyu Wang 已提交
531
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
D
stmt  
dapan1121 已提交
532 533 534 535 536 537 538 539 540 541 542 543 544
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
545
int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) {
546 547
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
  const int factor = 5;
X
Xiaoyu Wang 已提交
548 549
  uint32_t  nAllocSizeOld = pDataBlock->nAllocSize;

550 551 552 553 554 555 556
  // expand the allocated size
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }

X
Xiaoyu Wang 已提交
557
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
573
int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
C
Cary Xu 已提交
574 575 576 577
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
578 579
  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
580

D
stmt  
dapan1121 已提交
581
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
D
stmt  
dapan1121 已提交
582
  STableDataBlocks* pBlock = (STableDataBlocks*)block;
D
stmt  
dapan1121 已提交
583

D
stmt  
dapan1121 已提交
584 585 586 587 588 589 590 591
  if (keepBuf) {
    taosMemoryFreeClear(pBlock->pData);
    pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
    if (NULL == pBlock->pData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memset(pBlock->pData, 0, sizeof(SSubmitBlk));
  } else {
X
Xiaoyu Wang 已提交
592
    pBlock->pData = NULL;
D
stmt  
dapan1121 已提交
593
  }
X
Xiaoyu Wang 已提交
594 595 596 597

  pBlock->ordered = true;
  pBlock->prevTS = INT64_MIN;
  pBlock->size = sizeof(SSubmitBlk);
D
stmt  
dapan1121 已提交
598 599 600 601
  pBlock->tsSource = -1;
  pBlock->numOfTables = 1;
  pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
  pBlock->headerSize = pBlock->size;
D
dapan 已提交
602
  pBlock->createTbReqLen = 0;
X
Xiaoyu Wang 已提交
603

D
stmt  
dapan1121 已提交
604
  memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
D
stmt  
dapan1121 已提交
605 606

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
607 608
}

D
stmt  
dapan1121 已提交
609 610 611 612
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
  *pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
613
  }
X
Xiaoyu Wang 已提交
614

D
stmt  
dapan1121 已提交
615 616
  memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
  ((STableDataBlocks*)(*pDst))->cloned = true;
X
Xiaoyu Wang 已提交
617

D
stmt  
dapan1121 已提交
618
  return qResetStmtDataBlock(*pDst, false);
D
stmt  
dapan1121 已提交
619 620
}

D
dapan 已提交
621
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
D
stmt  
dapan1121 已提交
622 623 624
  int32_t code = qCloneStmtDataBlock(pDst, pSrc);
  if (code) {
    return code;
D
stmt  
dapan1121 已提交
625 626
  }

X
Xiaoyu Wang 已提交
627
  STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
D
stmt  
dapan1121 已提交
628 629 630 631
  pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
  if (NULL == pBlock->pData) {
    qFreeStmtDataBlock(pBlock);
    return TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
632 633
  }

D
dapan 已提交
634 635
  pBlock->vgId = vgId;
  
D
dapan 已提交
636 637
  if (pBlock->pTableMeta) {
    pBlock->pTableMeta->uid = uid;
D
dapan 已提交
638
    pBlock->pTableMeta->vgId = vgId;
D
dapan 已提交
639 640
  }
  
D
stmt  
dapan1121 已提交
641 642
  memset(pBlock->pData, 0, sizeof(SSubmitBlk));

D
stmt  
dapan1121 已提交
643 644 645
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
646 647 648 649
STableMeta *qGetTableMetaInDataBlock(void* pDataBlock) {
  return ((STableDataBlocks*)pDataBlock)->pTableMeta;
}

D
stmt  
dapan1121 已提交
650 651 652
void qFreeStmtDataBlock(void* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
653 654
  }

D
stmt  
dapan1121 已提交
655 656
  taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
  taosMemoryFreeClear(pDataBlock);
D
stmt  
dapan1121 已提交
657 658
}

D
stmt  
dapan1121 已提交
659 660 661
void qDestroyStmtDataBlock(void* pBlock) {
  if (pBlock == NULL) {
    return;
D
stmt  
dapan1121 已提交
662 663
  }

D
stmt  
dapan1121 已提交
664
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
665

D
stmt  
dapan1121 已提交
666 667 668
  pDataBlock->cloned = false;
  destroyDataBlock(pDataBlock);
}