parInsertUtil.c 31.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
X
Xiaoyu Wang 已提交
15

X
Xiaoyu Wang 已提交
16
#include "parInsertUtil.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parInt.h"
X
Xiaoyu Wang 已提交
20
#include "parUtil.h"
X
Xiaoyu Wang 已提交
21
#include "querynodes.h"
C
Cary Xu 已提交
22
#include "tRealloc.h"
23

24
typedef struct SBlockKeyTuple {
X
Xiaoyu Wang 已提交
25 26
  TSKEY   skey;
  void*   payloadAddr;
wafwerar's avatar
wafwerar 已提交
27
  int16_t index;
28 29 30 31 32 33 34
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

C
Cary Xu 已提交
35 36
typedef struct {
  int32_t   index;
C
Cary Xu 已提交
37
  SArray*   rowArray;  // array of merged rows(mem allocated by tRealloc/free by tFree)
C
Cary Xu 已提交
38
  STSchema* pSchema;
C
Cary Xu 已提交
39
  int64_t   tbUid;  // suid for child table, uid for normal table
C
Cary Xu 已提交
40 41
} SBlockRowMerger;

C
Cary Xu 已提交
42
static FORCE_INLINE void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) {
C
Cary Xu 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56
  if (pMerger) {
    pMerger->index = -1;
  }
}

static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) {
  if (pMerger) {
    int32_t size = taosArrayGetSize(pMerger->rowArray);
    for (int32_t i = 0; i < size; ++i) {
      tFree(*(void**)taosArrayGet(pMerger->rowArray, i));
    }
    taosArrayDestroy(pMerger->rowArray);

    taosMemoryFreeClear(pMerger->pSchema);
C
Cary Xu 已提交
57
    taosMemoryFree(pMerger);
C
Cary Xu 已提交
58 59 60
  }
}

X
Xiaoyu Wang 已提交
61 62 63
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
64 65 66 67 68 69 70
  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

wafwerar's avatar
wafwerar 已提交
71 72 73 74 75 76 77 78 79 80
static int32_t rowDataComparStable(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
  if (left == right) {
    return ((SBlockKeyTuple*)lhs)->index - ((SBlockKeyTuple*)rhs)->index;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
int32_t insGetExtendedRowSize(STableDataBlocks* pBlock) {
  STableComInfo* pTableInfo = &pBlock->pTableMeta->tableInfo;
  ASSERT(pBlock->rowSize == pTableInfo->rowSize);
  return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen +
         (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
}

void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo* spd, col_id_t idx, int32_t* toffset,
                            col_id_t* colIdx) {
  col_id_t schemaIdx = 0;
  if (IS_DATA_COL_ORDERED(spd)) {
    schemaIdx = spd->boundColumns[idx];
    if (TD_IS_TP_ROW_T(rowType)) {
      *toffset = (spd->cols + schemaIdx)->toffset;  // the offset of firstPart
      *colIdx = schemaIdx;
    } else {
      *toffset = idx * sizeof(SKvRowIdx);  // the offset of SKvRowIdx
      *colIdx = idx;
    }
  } else {
    ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
    schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
    if (TD_IS_TP_ROW_T(rowType)) {
      *toffset = (spd->cols + schemaIdx)->toffset;
      *colIdx = schemaIdx;
    } else {
      *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SKvRowIdx);
      *colIdx = (spd->colIdxInfo + idx)->finalIdx;
    }
  }
}

X
Xiaoyu Wang 已提交
113
int32_t insSetBlockInfo(SSubmitBlk* pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows, SMsgBuf* pMsg) {
X
Xiaoyu Wang 已提交
114 115 116 117 118 119
  pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid);
  pBlocks->uid = dataBuf->pTableMeta->uid;
  pBlocks->sversion = dataBuf->pTableMeta->sversion;
  pBlocks->schemaLen = dataBuf->createTbReqLen;

  if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
X
Xiaoyu Wang 已提交
120
    return buildInvalidOperationMsg(pMsg, "too many rows in sql, total number of rows should be less than INT32_MAX");
X
Xiaoyu Wang 已提交
121
  }
X
Xiaoyu Wang 已提交
122 123
  pBlocks->numOfRows += numOfRows;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
124 125 126
}

void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
127 128 129
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
130
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
131
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
155
    pColList->boundColumns[i] = i;
156 157
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
158
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
159 160 161
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

X
Xiaoyu Wang 已提交
162
int32_t insSchemaIdxCompar(const void* lhs, const void* rhs) {
X
Xiaoyu Wang 已提交
163 164
  uint16_t left = *(uint16_t*)lhs;
  uint16_t right = *(uint16_t*)rhs;
165 166 167 168 169 170 171 172

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
173
int32_t insBoundIdxCompar(const void* lhs, const void* rhs) {
X
Xiaoyu Wang 已提交
174 175
  uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
176 177 178 179 180 181 182 183

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

D
stmt  
dapan1121 已提交
184 185
void destroyBoundColumnInfo(void* pBoundInfo) {
  if (NULL == pBoundInfo) {
D
stmt  
dapan1121 已提交
186 187
    return;
  }
D
stmt  
dapan1121 已提交
188 189

  SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
X
Xiaoyu Wang 已提交
190

191
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
192 193
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
194 195
}

wmmhello's avatar
wmmhello 已提交
196
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
X
Xiaoyu Wang 已提交
197
                               STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
198
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
199 200 201 202 203 204 205 206 207 208 209 210
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
211
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
212
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
213
    taosMemoryFreeClear(dataBuf);
214 215 216 217
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

X
Xiaoyu Wang 已提交
218
  dataBuf->pTableMeta = tableMetaDup(pTableMeta);
219 220

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
X
Xiaoyu Wang 已提交
221
  SSchema*            pSchema = getTableColumnSchema(dataBuf->pTableMeta);
X
Xiaoyu Wang 已提交
222
  insSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
223

X
Xiaoyu Wang 已提交
224 225 226 227 228
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
  dataBuf->vgId = dataBuf->pTableMeta->vgId;
229 230 231 232 233 234 235

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
236
int32_t insBuildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
H
Hongze Cheng 已提交
237
  SEncoder coder = {0};
X
Xiaoyu Wang 已提交
238 239
  char*    pBuf;
  int32_t  len;
H
Hongze Cheng 已提交
240

wafwerar's avatar
wafwerar 已提交
241 242
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, ret);
X
Xiaoyu Wang 已提交
243 244 245 246 247 248 249 250 251 252 253
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
H
Hongze Cheng 已提交
254

X
Xiaoyu Wang 已提交
255
  pBuf = pBlocks->pData + pBlocks->size;
H
Hongze Cheng 已提交
256

H
Hongze Cheng 已提交
257
  tEncoderInit(&coder, pBuf, len);
X
Xiaoyu Wang 已提交
258
  int32_t code = tEncodeSVCreateTbReq(&coder, pCreateTbReq);
H
Hongze Cheng 已提交
259
  tEncoderClear(&coder);
X
Xiaoyu Wang 已提交
260 261
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
X
Xiaoyu Wang 已提交
262 263

  return code;
X
Xiaoyu Wang 已提交
264 265
}

X
Xiaoyu Wang 已提交
266
void insDestroyDataBlock(STableDataBlocks* pDataBlock) {
X
Xiaoyu Wang 已提交
267 268 269 270 271 272 273 274 275 276
  if (pDataBlock == NULL) {
    return;
  }

  taosMemoryFreeClear(pDataBlock->pData);
  taosMemoryFreeClear(pDataBlock->pTableMeta);
  destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  taosMemoryFreeClear(pDataBlock);
}

X
Xiaoyu Wang 已提交
277 278 279
int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset,
                                int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
                                SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
280
  *dataBlocks = NULL;
D
dapan 已提交
281
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
282 283 284 285 286
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
287
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
288 289 290 291
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
Hongze Cheng 已提交
292
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
X
Xiaoyu Wang 已提交
293
      ret = insBuildCreateTbMsg(*dataBlocks, pCreateTbReq);
X
Xiaoyu Wang 已提交
294
      if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
295
        insDestroyDataBlock(*dataBlocks);
X
Xiaoyu Wang 已提交
296 297 298 299
        return ret;
      }
    }

X
Xiaoyu Wang 已提交
300 301
    // converting to 'const char*' is to handle coverity scan errors
    taosHashPut(pHashList, (const char*)id, idLen, (const char*)dataBlocks, POINTER_BYTES);
302 303 304 305 306 307 308 309
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
310
void insDestroyBlockArrayList(SArray* pDataBlockList) {
311
  if (pDataBlockList == NULL) {
312
    return;
313 314 315 316
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
317
    void* p = taosArrayGetP(pDataBlockList, i);
X
Xiaoyu Wang 已提交
318
    insDestroyDataBlock(p);
319 320 321 322 323
  }

  taosArrayDestroy(pDataBlockList);
}

X
Xiaoyu Wang 已提交
324
void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
325 326 327 328 329 330 331
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
X
Xiaoyu Wang 已提交
332
    insDestroyDataBlock(pBlocks);
333 334 335 336 337 338 339

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

340
// data block is disordered, sort it in ascending order
C
Cary Xu 已提交
341
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
X
Xiaoyu Wang 已提交
342
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
343 344 345 346 347 348 349
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
X
Xiaoyu Wang 已提交
350
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
351 352 353
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
354
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
355 356 357 358
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

X
Xiaoyu Wang 已提交
359
  int32_t         extendedRowSize = insGetExtendedRowSize(dataBuf);
X
Xiaoyu Wang 已提交
360 361
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
362 363
  int             n = 0;
  while (n < nRows) {
X
Xiaoyu Wang 已提交
364
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
365
    pBlkKeyTuple->payloadAddr = pBlockData;
wafwerar's avatar
wafwerar 已提交
366
    pBlkKeyTuple->index = n;
367 368 369 370 371 372 373 374 375

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
376 377

    // todo. qsort is unstable, if timestamp is same, should get the last one
wafwerar's avatar
wafwerar 已提交
378
    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

C
Cary Xu 已提交
409 410 411 412 413 414 415 416
static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) {
  if (pBlkRowMerger && (pBlkRowMerger->index >= 0)) {
    ASSERT(pBlkRowMerger->index < taosArrayGetSize(pBlkRowMerger->rowArray));
    return *(void**)taosArrayGet(pBlkRowMerger->rowArray, pBlkRowMerger->index);
  }
  return NULL;
}

C
Cary Xu 已提交
417
static int32_t tdBlockRowMerge(STableMeta* pTableMeta, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows,
C
Cary Xu 已提交
418 419 420 421 422 423 424
                               SBlockRowMerger** pBlkRowMerger, int32_t rowSize) {
  ASSERT(nDupRows > 1);
  SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1);
  ASSERT(pStartKeyTp->skey == pEndKeyTp->skey);

  // TODO: optimization if end row is all normal
#if 0
C
Cary Xu 已提交
425
  STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr;
C
Cary Xu 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
  if(isNormal(pEndRow)) { // set the end row if it is normal and return directly
    pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr;
    return TSDB_CODE_SUCCESS;
  }
#endif

  if (!(*pBlkRowMerger)) {
    (*pBlkRowMerger) = taosMemoryCalloc(1, sizeof(**pBlkRowMerger));
    if (!(*pBlkRowMerger)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
    (*pBlkRowMerger)->index = -1;
    if (!(*pBlkRowMerger)->rowArray) {
      (*pBlkRowMerger)->rowArray = taosArrayInit(1, sizeof(void*));
      if (!(*pBlkRowMerger)->rowArray) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
    }
  }

C
Cary Xu 已提交
448 449 450 451 452 453 454 455 456 457
  if ((*pBlkRowMerger)->pSchema) {
    if ((*pBlkRowMerger)->pSchema->version != pTableMeta->sversion) {
      taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
    } else {
      if ((*pBlkRowMerger)->tbUid != (pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid)) {
        taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
      }
    }
  }

C
Cary Xu 已提交
458
  if (!(*pBlkRowMerger)->pSchema) {
C
Cary Xu 已提交
459 460
    (*pBlkRowMerger)->pSchema =
        tdGetSTSChemaFromSSChema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
C
Cary Xu 已提交
461 462 463 464 465

    if (!(*pBlkRowMerger)->pSchema) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
466
    (*pBlkRowMerger)->tbUid = pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid;
C
Cary Xu 已提交
467 468 469 470 471
  }

  void* pDestRow = NULL;
  ++((*pBlkRowMerger)->index);
  if ((*pBlkRowMerger)->index < taosArrayGetSize((*pBlkRowMerger)->rowArray)) {
472 473
    void** pAlloc = (void**)taosArrayGet((*pBlkRowMerger)->rowArray, (*pBlkRowMerger)->index);
    if (tRealloc((uint8_t**)pAlloc, rowSize) != 0) {
C
Cary Xu 已提交
474 475
      return TSDB_CODE_FAILED;
    }
476
    pDestRow = *pAlloc;
C
Cary Xu 已提交
477 478 479 480 481 482 483 484 485 486 487 488
  } else {
    if (tRealloc((uint8_t**)&pDestRow, rowSize) != 0) {
      return TSDB_CODE_FAILED;
    }
    taosArrayPush((*pBlkRowMerger)->rowArray, &pDestRow);
  }

  // merge rows to pDestRow
  STSchema* pSchema = (*pBlkRowMerger)->pSchema;
  SArray*   pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
  for (int32_t i = 0; i < pSchema->numOfCols; ++i) {
    SColVal colVal = {0};
C
Cary Xu 已提交
489
    for (int32_t j = 0; j < nDupRows; ++j) {
C
Cary Xu 已提交
490
      tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal);
H
Hongze Cheng 已提交
491
      if (!COL_VAL_IS_NONE(&colVal)) {
C
Cary Xu 已提交
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
        break;
      }
    }
    taosArrayPush(pArray, &colVal);
  }
  if (tdSTSRowNew(pArray, pSchema, (STSRow**)&pDestRow) < 0) {
    taosArrayDestroy(pArray);
    return TSDB_CODE_FAILED;
  }

  taosArrayDestroy(pArray);
  return TSDB_CODE_SUCCESS;
}

// data block is disordered, sort it in ascending order, and merge dup rows if exists
static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo,
                                     SBlockRowMerger** ppBlkRowMerger) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
C
Cary Xu 已提交
510
  STableMeta* pTableMeta = dataBuf->pTableMeta;
511
  int32_t     nRows = pBlocks->numOfRows;
C
Cary Xu 已提交
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528

  // size is less than the total size, since duplicated rows may be removed.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  tdResetSBlockRowMerger(*ppBlkRowMerger);

X
Xiaoyu Wang 已提交
529
  int32_t         extendedRowSize = insGetExtendedRowSize(dataBuf);
C
Cary Xu 已提交
530 531
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
532
  int32_t         n = 0;
C
Cary Xu 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
  while (n < nRows) {
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
    pBlkKeyTuple->payloadAddr = pBlockData;
    pBlkKeyTuple->index = n;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;

    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    bool    hasDup = false;
    int32_t nextPos = 0;
    int32_t i = 0;
    int32_t j = 1;

    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      if ((j - i) > 1) {
C
Cary Xu 已提交
565
        if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
566 567 568
          return TSDB_CODE_FAILED;
        }
        (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
C
Cary Xu 已提交
569 570 571
        if (!hasDup) {
          hasDup = true;
        }
C
Cary Xu 已提交
572 573 574 575 576 577 578 579 580 581 582 583 584 585
        i = j;
      } else {
        if (hasDup) {
          memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
        }
        ++i;
      }

      ++nextPos;
      ++j;
    }

    if ((j - i) > 1) {
      ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey);
C
Cary Xu 已提交
586
      if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
587 588 589 590 591 592 593 594
        return TSDB_CODE_FAILED;
      }
      (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
    } else if (hasDup) {
      memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
    }

    dataBuf->ordered = true;
C
Cary Xu 已提交
595
    pBlocks->numOfRows = nextPos + 1;
C
Cary Xu 已提交
596 597 598 599 600 601 602 603
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return TSDB_CODE_SUCCESS;
}

604
// Erase the empty space reserved for binary data
605
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple) {
606
  // TODO: optimize this function, handle the case while binary is not presented
X
Xiaoyu Wang 已提交
607
  int32_t     nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
608
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
609 610
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
611

X
Xiaoyu Wang 已提交
612
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
613 614
  pBlock->dataLen = 0;

615 616 617 618 619 620 621
  int32_t numOfRows = pBlock->numOfRows;
  for (int32_t i = 0; i < numOfRows; ++i) {
    void*     payload = (blkKeyTuple + i)->payloadAddr;
    TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
    memcpy(pDataBlock, payload, rowTLen);
    pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
    pBlock->dataLen += rowTLen;
622 623
  }

624
  return pBlock->dataLen + pBlock->schemaLen;
625 626
}

627
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
628
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
629
  int       code = 0;
D
dapan 已提交
630
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
631 632 633
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
X
Xiaoyu Wang 已提交
634 635
  STableDataBlocks*  pOneTableBlock = *p;
  SBlockKeyInfo      blkKeyInfo = {0};  // share by pOneTableBlock
X
Xiaoyu Wang 已提交
636
  SBlockRowMerger*   pBlkRowMerger = NULL;
C
Cary Xu 已提交
637

638
  while (pOneTableBlock) {
X
Xiaoyu Wang 已提交
639
    SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
640 641
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
X
Xiaoyu Wang 已提交
642
      pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId;  // for schemaless, restore origin vgId
X
Xiaoyu Wang 已提交
643 644 645
      int32_t ret = insGetDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId,
                                            sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
                                            pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
646
      if (ret != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
647
        tdFreeSBlockRowMerger(pBlkRowMerger);
648
        taosHashCleanup(pVnodeDataBlockHashList);
X
Xiaoyu Wang 已提交
649
        insDestroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
650
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
651 652
        return ret;
      }
X
Xiaoyu Wang 已提交
653
      ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
654
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
655
      int64_t destSize = dataBuf->size + pOneTableBlock->size +
X
Xiaoyu Wang 已提交
656 657
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
                         pOneTableBlock->createTbReqLen;
658 659 660

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
661
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
662 663 664
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
C
Cary Xu 已提交
665
          tdFreeSBlockRowMerger(pBlkRowMerger);
666
          taosHashCleanup(pVnodeDataBlockHashList);
X
Xiaoyu Wang 已提交
667
          insDestroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
668 669
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
670 671 672 673
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

674 675 676 677 678 679 680
      if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
        tdFreeSBlockRowMerger(pBlkRowMerger);
        taosHashCleanup(pVnodeDataBlockHashList);
        insDestroyBlockArrayList(pVnodeDataBlockList);
        taosMemoryFreeClear(dataBuf->pData);
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
        return code;
681
      }
682
      ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
683 684

      // erase the empty space reserved for binary data
685
      int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple);
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
C
Cary Xu 已提交
701
  tdFreeSBlockRowMerger(pBlkRowMerger);
702
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
703
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
704
  *pVgDataBlocks = pVnodeDataBlockList;
705 706 707
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
708
int32_t insAllocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
X
Xiaoyu Wang 已提交
709
  size_t   remain = pDataBlock->nAllocSize - pDataBlock->size;
D
stmt  
dapan1121 已提交
710
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
X
Xiaoyu Wang 已提交
711

D
stmt  
dapan1121 已提交
712 713 714 715
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

X
Xiaoyu Wang 已提交
716
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
D
stmt  
dapan1121 已提交
717 718 719 720 721 722 723 724 725 726 727 728 729
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
730 731 732 733 734 735 736
int32_t insInitRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
737

X
Xiaoyu Wang 已提交
738 739 740 741 742 743 744 745
static char* tableNameGetPosition(SToken* pToken, char target) {
  bool inEscape = false;
  bool inQuote = false;
  char quotaStr = 0;

  for (uint32_t i = 0; i < pToken->n; ++i) {
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
      return pToken->z + i;
746 747
    }

X
Xiaoyu Wang 已提交
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
      if (!inQuote) {
        inEscape = !inEscape;
      }
    }

    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
      if (!inEscape) {
        if (!inQuote) {
          quotaStr = *(pToken->z + i);
          inQuote = !inQuote;
        } else if (quotaStr == *(pToken->z + i)) {
          inQuote = !inQuote;
        }
      }
763 764 765
    }
  }

X
Xiaoyu Wang 已提交
766
  return NULL;
767 768
}

X
Xiaoyu Wang 已提交
769 770 771 772 773 774 775 776
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";
  const char* msg4 = "invalid table name";

  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
D
stmt  
dapan1121 已提交
777

X
Xiaoyu Wang 已提交
778 779
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
    assert(*p == TS_PATH_DELIMITER[0]);
D
stmt  
dapan1121 已提交
780

X
Xiaoyu Wang 已提交
781 782 783
    int32_t dbLen = p - pTableName->z;
    if (dbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg2);
D
stmt  
dapan1121 已提交
784
    }
X
Xiaoyu Wang 已提交
785 786 787
    char name[TSDB_DB_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, dbLen);
    int32_t actualDbLen = strdequote(name);
X
Xiaoyu Wang 已提交
788

X
Xiaoyu Wang 已提交
789 790 791 792
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
X
Xiaoyu Wang 已提交
793

X
Xiaoyu Wang 已提交
794 795 796 797
    int32_t tbLen = pTableName->n - dbLen - 1;
    if (tbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg4);
    }
D
stmt  
dapan1121 已提交
798

X
Xiaoyu Wang 已提交
799 800 801
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(tbname, p + 1, tbLen);
    /*tbLen = */ strdequote(tbname);
D
stmt  
dapan1121 已提交
802

X
Xiaoyu Wang 已提交
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

    if (dbName == NULL) {
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }
X
Xiaoyu Wang 已提交
821

X
Xiaoyu Wang 已提交
822 823 824 825 826
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }
X
Xiaoyu Wang 已提交
827

X
Xiaoyu Wang 已提交
828 829 830
    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
D
dapan1121 已提交
831 832 833
    }
  }

X
Xiaoyu Wang 已提交
834 835 836 837
  if (NULL != strchr(pName->tname, '.')) {
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
  }

X
Xiaoyu Wang 已提交
838
  return code;
D
stmt  
dapan1121 已提交
839 840
}

X
Xiaoyu Wang 已提交
841 842 843 844 845 846
int32_t insFindCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
D
stmt  
dapan1121 已提交
847
  }
X
Xiaoyu Wang 已提交
848 849
  return -1;
}
D
stmt  
dapan1121 已提交
850

X
Xiaoyu Wang 已提交
851 852 853 854 855 856 857 858 859 860 861 862 863 864
void insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
                         SArray* tagName, uint8_t tagNum) {
  pTbReq->type = TD_CHILD_TABLE;
  pTbReq->name = strdup(tname);
  pTbReq->ctb.suid = suid;
  pTbReq->ctb.tagNum = tagNum;
  if (sname) pTbReq->ctb.stbName = strdup(sname);
  pTbReq->ctb.pTag = (uint8_t*)pTag;
  pTbReq->ctb.tagName = taosArrayDup(tagName);
  pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL;
  pTbReq->commentLen = -1;

  return;
}
D
stmt  
dapan1121 已提交
865

X
Xiaoyu Wang 已提交
866 867 868
int32_t insMemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
X
Xiaoyu Wang 已提交
869

X
Xiaoyu Wang 已提交
870 871 872
  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
873
  }
X
Xiaoyu Wang 已提交
874

X
Xiaoyu Wang 已提交
875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
    const char* rowEnd = tdRowEnd(rb->pBuf);
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
    }
    varDataSetLen(rowEnd, output);
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
  } else {
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
  }
D
stmt  
dapan1121 已提交
896

D
stmt  
dapan1121 已提交
897 898 899
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
900 901 902 903 904
int32_t insCheckTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan 已提交
905

X
Xiaoyu Wang 已提交
906 907 908
  TSKEY k = *(TSKEY*)start;
  if (k <= pDataBlocks->prevTS) {
    pDataBlocks->ordered = false;
D
stmt  
dapan1121 已提交
909 910
  }

X
Xiaoyu Wang 已提交
911 912
  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
913 914
}

X
Xiaoyu Wang 已提交
915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
    int32_t schemaLen = blk->schemaLen;
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htonl(blk->numOfRows);
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
D
stmt  
dapan1121 已提交
933
  }
X
Xiaoyu Wang 已提交
934
}
D
stmt  
dapan1121 已提交
935

X
Xiaoyu Wang 已提交
936 937 938 939
int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray** pDataBlocks) {
  size_t numOfVg = taosArrayGetSize(pVgDataBlocks);
  *pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == *pDataBlocks) {
X
Xiaoyu Wang 已提交
940 941 942
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
X
Xiaoyu Wang 已提交
943
    STableDataBlocks* src = taosArrayGetP(pVgDataBlocks, i);
X
Xiaoyu Wang 已提交
944 945 946 947
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
948
    taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
X
Xiaoyu Wang 已提交
949 950 951 952
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
    TSWAP(dst->pData, src->pData);
    buildMsgHeader(src, dst);
X
Xiaoyu Wang 已提交
953
    taosArrayPush(*pDataBlocks, &dst);
X
Xiaoyu Wang 已提交
954 955
  }
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
956
}