parInsertData.c 18.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "parInsertData.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parUtil.h"
X
Xiaoyu Wang 已提交
20
#include "querynodes.h"
21 22 23 24

#define IS_RAW_PAYLOAD(t) \
  (((int)(t)) == PAYLOAD_TYPE_RAW)  // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert

25 26 27 28 29 30 31 32 33 34
typedef struct SBlockKeyTuple {
  TSKEY skey;
  void* payloadAddr;
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

35 36 37 38 39 40 41 42 43 44 45
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

46
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
47 48 49
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
50
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
51
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    pColList->flen += TYPE_BYTES[type];
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
76
    pColList->boundColumns[i] = pSchema[i].colId;
77 78
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
79
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

int32_t schemaIdxCompar(const void *lhs, const void *rhs) {
  uint16_t left = *(uint16_t *)lhs;
  uint16_t right = *(uint16_t *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

int32_t boundIdxCompar(const void *lhs, const void *rhs) {
  uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t));

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
D
stmt  
dapan1121 已提交
106 107 108 109
  if (NULL == pColList) {
    return;
  }
  
110
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
111 112
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
113 114
}

115
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset,
116
                           const STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
117
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
118 119 120 121 122 123 124 125 126 127 128 129
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
130
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
131
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
132
    taosMemoryFreeClear(dataBuf);
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

  //Here we keep the tableMeta to avoid it to be remove by other threads.
  dataBuf->pTableMeta = tableMetaDup(pTableMeta);

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
  SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta);
  setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);

  dataBuf->ordered  = true;
  dataBuf->prevTS   = INT64_MIN;
  dataBuf->rowSize  = rowSize;
  dataBuf->size     = startOffset;
  dataBuf->vgId     = dataBuf->pTableMeta->vgId;

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
static int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
  int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq);
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
  char* pBuf = pBlocks->pData + pBlocks->size;
  tSerializeSVCreateTbReq((void**)&pBuf, pCreateTbReq);
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
  return TSDB_CODE_SUCCESS;
}

176
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
X
Xiaoyu Wang 已提交
177
    const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
178 179 180 181 182 183 184
  *dataBlocks = NULL;
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
185
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
186 187 188 189
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

X
Xiaoyu Wang 已提交
190 191 192 193 194 195 196
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctbCfg.pTag) {
      ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    }

197 198 199 200 201 202 203 204 205 206
    taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t getRowExpandSize(STableMeta* pTableMeta) {
C
Cary Xu 已提交
207
  int32_t  result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
208 209
  int32_t  columns = getNumOfColumns(pTableMeta);
  SSchema* pSchema = getTableColumnSchema(pTableMeta);
C
Cary Xu 已提交
210
  for (int32_t i = 0; i < columns; ++i) {
211 212 213 214
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
C
Cary Xu 已提交
215
  result += (int32_t)TD_BITMAP_BYTES(columns - 1);
216 217 218
  return result;
}

219
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
220 221 222 223
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
224
  taosMemoryFreeClear(pDataBlock->pData);
225 226 227
  if (!pDataBlock->cloned) {
    // free the refcount for metermeta
    if (pDataBlock->pTableMeta != NULL) {
wafwerar's avatar
wafwerar 已提交
228
      taosMemoryFreeClear(pDataBlock->pTableMeta);
229 230 231 232
    }

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  }
wafwerar's avatar
wafwerar 已提交
233
  taosMemoryFreeClear(pDataBlock);
234 235
}

236
void destroyBlockArrayList(SArray* pDataBlockList) {
237
  if (pDataBlockList == NULL) {
238
    return;
239 240 241 242
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
243 244
    void* p = taosArrayGetP(pDataBlockList, i);
    destroyDataBlock(p);
245 246 247 248 249
  }

  taosArrayDestroy(pDataBlockList);
}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
    destroyDataBlock(pBlocks);

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

266 267 268 269 270 271 272 273 274 275 276 277 278 279
// data block is disordered, sort it in ascending order
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);

  if (!dataBuf->ordered) {
    char *pBlockData = pBlocks->data;
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);

    int32_t i = 0;
    int32_t j = 1;

280
    // delete rows with timestamp conflicts
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
    while (j < pBlocks->numOfRows) {
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;

    pBlocks->numOfRows = i + 1;
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
  }

  dataBuf->prevTS = INT64_MIN;
}

// data block is disordered, sort it in ascending order
int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) {
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
wafwerar's avatar
wafwerar 已提交
317
    char *tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
318 319 320 321 322 323 324 325 326 327
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  int32_t         extendedRowSize = getExtendedRowSize(dataBuf);
  SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
X
Xiaoyu Wang 已提交
328
  char *          pBlockData = pBlocks->data + pBlocks->schemaLen;
329 330
  int             n = 0;
  while (n < nRows) {
C
Cary Xu 已提交
331
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow *)pBlockData);
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
    pBlkKeyTuple->payloadAddr = pBlockData;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

// Erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
374
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, bool isRawPayload) {
375 376 377 378 379
  // TODO: optimize this function, handle the case while binary is not presented
  STableMeta*     pTableMeta = pTableDataBlock->pTableMeta;
  STableComInfo   tinfo = getTableInfo(pTableMeta);
  SSchema*        pSchema = getTableColumnSchema(pTableMeta);

X
Xiaoyu Wang 已提交
380
  int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
381
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
382 383
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
384 385

  int32_t flen = 0;  // original total length of row
X
Xiaoyu Wang 已提交
386 387
  if (isRawPayload) {
    for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
388 389 390
      flen += TYPE_BYTES[pSchema[j].type];
    }
  }
X
Xiaoyu Wang 已提交
391
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
392

X
Xiaoyu Wang 已提交
393
  char* p = pTableDataBlock->pData + nonDataLen;
394
  pBlock->dataLen = 0;
395
  int32_t numOfRows = pBlock->numOfRows;
396 397

  if (isRawPayload) {
C
Cary Xu 已提交
398 399 400 401
    SRowBuilder builder = {0};
    
    tdSRowInit(&builder, pTableMeta->sversion);
    tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
402

C
Cary Xu 已提交
403 404
    for (int32_t i = 0; i < numOfRows; ++i) {
      tdSRowResetBuf(&builder, pDataBlock);
405
      int toffset = 0;
C
Cary Xu 已提交
406 407 408 409 410
      for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
        int8_t  colType = pSchema[j].type;
        uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
        tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
        toffset += TYPE_BYTES[colType];
411 412
        p += pSchema[j].bytes;
      }
C
Cary Xu 已提交
413 414 415
      int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
      pDataBlock = (char*)pDataBlock + rowLen;
      pBlock->dataLen += rowLen;
416 417 418
    }
  } else {
    for (int32_t i = 0; i < numOfRows; ++i) {
C
Cary Xu 已提交
419
      char*      payload = (blkKeyTuple + i)->payloadAddr;
C
Cary Xu 已提交
420
      TDRowLenT  rowTLen = TD_ROW_LEN((STSRow*)payload);
C
Cary Xu 已提交
421 422 423
      memcpy(pDataBlock, payload, rowTLen);
      pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
      pBlock->dataLen += rowTLen;
424 425 426
    }
  }

427
  return pBlock->dataLen + pBlock->schemaLen;
428 429
}

X
Xiaoyu Wang 已提交
430
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
431
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
432 433
  int       code = 0;
  bool      isRawPayload = IS_RAW_PAYLOAD(payloadType);
434
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
435 436 437 438 439
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
  STableDataBlocks* pOneTableBlock = *p;
  SBlockKeyInfo blkKeyInfo = {0};  // share by pOneTableBlock
440
  while (pOneTableBlock) {
441 442 443 444
    SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
      int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
X
Xiaoyu Wang 已提交
445
          INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
446 447 448
      if (ret != TSDB_CODE_SUCCESS) {
        taosHashCleanup(pVnodeDataBlockHashList);
        destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
449
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
450 451 452
        return ret;
      }

453
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
454
      int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
455 456 457 458 459
      int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
460
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
461 462 463 464 465
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
466 467
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
468 469 470 471 472 473 474 475 476 477
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

      if (isRawPayload) {
        sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
      } else {
        if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
          taosHashCleanup(pVnodeDataBlockHashList);
          destroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
478 479
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
480 481 482 483 484 485 486 487 488 489
          return code;
        }
        ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
      }

      int32_t len = pBlocks->numOfRows *
                        (isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
                    sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);

      // erase the empty space reserved for binary data
X
Xiaoyu Wang 已提交
490
      int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
      assert(finalLen <= len);

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
508
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
509
  *pVgDataBlocks = pVnodeDataBlockList;
510 511 512
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize) {
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
  
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

    char *tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

535 536 537 538 539 540 541 542 543 544 545 546
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
  const int factor = 5;
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
  
  // expand the allocated size
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }

wafwerar's avatar
wafwerar 已提交
547
    char *tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
563 564 565 566 567
int  initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo) {
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
568 569
  return TSDB_CODE_SUCCESS;
}