parInsertUtil.c 32.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
X
Xiaoyu Wang 已提交
15

X
Xiaoyu Wang 已提交
16
#include "parInsertUtil.h"
17 18

#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "parInt.h"
X
Xiaoyu Wang 已提交
20
#include "parUtil.h"
X
Xiaoyu Wang 已提交
21
#include "querynodes.h"
C
Cary Xu 已提交
22
#include "tRealloc.h"
23

24
typedef struct SBlockKeyTuple {
X
Xiaoyu Wang 已提交
25 26
  TSKEY   skey;
  void*   payloadAddr;
wafwerar's avatar
wafwerar 已提交
27
  int16_t index;
28 29 30 31 32 33 34
} SBlockKeyTuple;

typedef struct SBlockKeyInfo {
  int32_t         maxBytesAlloc;
  SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;

C
Cary Xu 已提交
35 36
typedef struct {
  int32_t   index;
C
Cary Xu 已提交
37
  SArray*   rowArray;  // array of merged rows(mem allocated by tRealloc/free by tFree)
C
Cary Xu 已提交
38
  STSchema* pSchema;
C
Cary Xu 已提交
39
  int64_t   tbUid;  // suid for child table, uid for normal table
C
Cary Xu 已提交
40 41
} SBlockRowMerger;

C
Cary Xu 已提交
42
static FORCE_INLINE void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) {
C
Cary Xu 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56
  if (pMerger) {
    pMerger->index = -1;
  }
}

static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) {
  if (pMerger) {
    int32_t size = taosArrayGetSize(pMerger->rowArray);
    for (int32_t i = 0; i < size; ++i) {
      tFree(*(void**)taosArrayGet(pMerger->rowArray, i));
    }
    taosArrayDestroy(pMerger->rowArray);

    taosMemoryFreeClear(pMerger->pSchema);
C
Cary Xu 已提交
57
    taosMemoryFree(pMerger);
C
Cary Xu 已提交
58 59 60
  }
}

X
Xiaoyu Wang 已提交
61 62 63
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
64 65 66 67 68 69 70
  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

wafwerar's avatar
wafwerar 已提交
71 72 73 74 75 76 77 78 79 80
static int32_t rowDataComparStable(const void* lhs, const void* rhs) {
  TSKEY left = *(TSKEY*)lhs;
  TSKEY right = *(TSKEY*)rhs;
  if (left == right) {
    return ((SBlockKeyTuple*)lhs)->index - ((SBlockKeyTuple*)rhs)->index;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
int32_t insGetExtendedRowSize(STableDataBlocks* pBlock) {
  STableComInfo* pTableInfo = &pBlock->pTableMeta->tableInfo;
  ASSERT(pBlock->rowSize == pTableInfo->rowSize);
  return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen +
         (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
}

void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo* spd, col_id_t idx, int32_t* toffset,
                            col_id_t* colIdx) {
  col_id_t schemaIdx = 0;
  if (IS_DATA_COL_ORDERED(spd)) {
    schemaIdx = spd->boundColumns[idx];
    if (TD_IS_TP_ROW_T(rowType)) {
      *toffset = (spd->cols + schemaIdx)->toffset;  // the offset of firstPart
      *colIdx = schemaIdx;
    } else {
      *toffset = idx * sizeof(SKvRowIdx);  // the offset of SKvRowIdx
      *colIdx = idx;
    }
  } else {
    ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
    schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
    if (TD_IS_TP_ROW_T(rowType)) {
      *toffset = (spd->cols + schemaIdx)->toffset;
      *colIdx = schemaIdx;
    } else {
      *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SKvRowIdx);
      *colIdx = (spd->colIdxInfo + idx)->finalIdx;
    }
  }
}

int32_t insSetBlockInfo(SSubmitBlk* pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) {
  pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid);
  pBlocks->uid = dataBuf->pTableMeta->uid;
  pBlocks->sversion = dataBuf->pTableMeta->sversion;
  pBlocks->schemaLen = dataBuf->createTbReqLen;

  if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  } else {
    pBlocks->numOfRows += numOfRows;
    return TSDB_CODE_SUCCESS;
  }
}

void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
128 129 130
  pColList->numOfCols = numOfCols;
  pColList->numOfBound = numOfCols;
  pColList->orderStatus = ORDER_STATUS_ORDERED;  // default is ORDERED for non-bound mode
131
  pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
wafwerar's avatar
wafwerar 已提交
132
  pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  pColList->colIdxInfo = NULL;
  pColList->flen = 0;
  pColList->allNullLen = 0;

  int32_t nVar = 0;
  for (int32_t i = 0; i < pColList->numOfCols; ++i) {
    uint8_t type = pSchema[i].type;
    if (i > 0) {
      pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
      pColList->cols[i].toffset = pColList->flen;
    }
    pColList->flen += TYPE_BYTES[type];
    switch (type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
        ++nVar;
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        ++nVar;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
157
    pColList->boundColumns[i] = i;
158 159
  }
  pColList->allNullLen += pColList->flen;
C
Cary Xu 已提交
160
  pColList->boundNullLen = pColList->allNullLen;  // default set allNullLen
161 162 163
  pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}

X
Xiaoyu Wang 已提交
164
int32_t insSchemaIdxCompar(const void* lhs, const void* rhs) {
X
Xiaoyu Wang 已提交
165 166
  uint16_t left = *(uint16_t*)lhs;
  uint16_t right = *(uint16_t*)rhs;
167 168 169 170 171 172 173 174

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

X
Xiaoyu Wang 已提交
175
int32_t insBoundIdxCompar(const void* lhs, const void* rhs) {
X
Xiaoyu Wang 已提交
176 177
  uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
  uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
178 179 180 181 182 183 184 185

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
}

D
stmt  
dapan1121 已提交
186 187
void destroyBoundColumnInfo(void* pBoundInfo) {
  if (NULL == pBoundInfo) {
D
stmt  
dapan1121 已提交
188 189
    return;
  }
D
stmt  
dapan1121 已提交
190 191

  SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
X
Xiaoyu Wang 已提交
192

193
  taosMemoryFreeClear(pColList->boundColumns);
wafwerar's avatar
wafwerar 已提交
194 195
  taosMemoryFreeClear(pColList->cols);
  taosMemoryFreeClear(pColList->colIdxInfo);
196 197
}

wmmhello's avatar
wmmhello 已提交
198
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
X
Xiaoyu Wang 已提交
199
                               STableDataBlocks** dataBlocks) {
wafwerar's avatar
wafwerar 已提交
200
  STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
201 202 203 204 205 206 207 208 209 210 211 212
  if (dataBuf == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  dataBuf->nAllocSize = (uint32_t)defaultSize;
  dataBuf->headerSize = startOffset;

  // the header size will always be the startOffset value, reserved for the subumit block header
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize * 2;
  }

wafwerar's avatar
wafwerar 已提交
213
  dataBuf->pData = taosMemoryMalloc(dataBuf->nAllocSize);
214
  if (dataBuf->pData == NULL) {
wafwerar's avatar
wafwerar 已提交
215
    taosMemoryFreeClear(dataBuf);
216 217 218 219
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memset(dataBuf->pData, 0, sizeof(SSubmitBlk));

X
Xiaoyu Wang 已提交
220
  dataBuf->pTableMeta = tableMetaDup(pTableMeta);
221 222

  SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
X
Xiaoyu Wang 已提交
223
  SSchema*            pSchema = getTableColumnSchema(dataBuf->pTableMeta);
X
Xiaoyu Wang 已提交
224
  insSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
225

X
Xiaoyu Wang 已提交
226 227 228 229 230
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
  dataBuf->vgId = dataBuf->pTableMeta->vgId;
231 232 233 234 235 236 237

  assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);

  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
238
int32_t insBuildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
H
Hongze Cheng 已提交
239
  SEncoder coder = {0};
X
Xiaoyu Wang 已提交
240 241
  char*    pBuf;
  int32_t  len;
H
Hongze Cheng 已提交
242

wafwerar's avatar
wafwerar 已提交
243 244
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, ret);
X
Xiaoyu Wang 已提交
245 246 247 248 249 250 251 252 253 254 255
  if (pBlocks->nAllocSize - pBlocks->size < len) {
    pBlocks->nAllocSize += len + pBlocks->rowSize;
    char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
    if (pTmp != NULL) {
      pBlocks->pData = pTmp;
      memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
    } else {
      pBlocks->nAllocSize -= len + pBlocks->rowSize;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }
H
Hongze Cheng 已提交
256

X
Xiaoyu Wang 已提交
257
  pBuf = pBlocks->pData + pBlocks->size;
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259
  tEncoderInit(&coder, pBuf, len);
X
Xiaoyu Wang 已提交
260
  int32_t code = tEncodeSVCreateTbReq(&coder, pCreateTbReq);
H
Hongze Cheng 已提交
261
  tEncoderClear(&coder);
X
Xiaoyu Wang 已提交
262 263
  pBlocks->size += len;
  pBlocks->createTbReqLen = len;
X
Xiaoyu Wang 已提交
264 265

  return code;
X
Xiaoyu Wang 已提交
266 267
}

X
Xiaoyu Wang 已提交
268
void insDestroyDataBlock(STableDataBlocks* pDataBlock) {
X
Xiaoyu Wang 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282
  if (pDataBlock == NULL) {
    return;
  }

  taosMemoryFreeClear(pDataBlock->pData);
  //  if (!pDataBlock->cloned) {
  // free the refcount for metermeta
  taosMemoryFreeClear(pDataBlock->pTableMeta);

  destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  //  }
  taosMemoryFreeClear(pDataBlock);
}

X
Xiaoyu Wang 已提交
283 284 285
int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset,
                                int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
                                SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
286
  *dataBlocks = NULL;
D
dapan 已提交
287
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
288 289 290 291 292
  if (t1 != NULL) {
    *dataBlocks = *t1;
  }

  if (*dataBlocks == NULL) {
293
    int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
294 295 296 297
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
Hongze Cheng 已提交
298
    if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
X
Xiaoyu Wang 已提交
299
      ret = insBuildCreateTbMsg(*dataBlocks, pCreateTbReq);
X
Xiaoyu Wang 已提交
300
      if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
301
        insDestroyDataBlock(*dataBlocks);
X
Xiaoyu Wang 已提交
302 303 304 305
        return ret;
      }
    }

X
Xiaoyu Wang 已提交
306 307
    // converting to 'const char*' is to handle coverity scan errors
    taosHashPut(pHashList, (const char*)id, idLen, (const char*)dataBlocks, POINTER_BYTES);
308 309 310 311 312 313 314
    if (pBlockList) {
      taosArrayPush(pBlockList, dataBlocks);
    }
  }

  return TSDB_CODE_SUCCESS;
}
315
#if 0
316
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
C
Cary Xu 已提交
317
  int32_t  result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
318 319
  int32_t  columns = getNumOfColumns(pTableMeta);
  SSchema* pSchema = getTableColumnSchema(pTableMeta);
C
Cary Xu 已提交
320
  for (int32_t i = 0; i < columns; ++i) {
321 322 323 324
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
C
Cary Xu 已提交
325
  result += (int32_t)TD_BITMAP_BYTES(columns - 1);
326 327
  return result;
}
328
#endif
329

X
Xiaoyu Wang 已提交
330
void insDestroyBlockArrayList(SArray* pDataBlockList) {
331
  if (pDataBlockList == NULL) {
332
    return;
333 334 335 336
  }

  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
337
    void* p = taosArrayGetP(pDataBlockList, i);
X
Xiaoyu Wang 已提交
338
    insDestroyDataBlock(p);
339 340 341 342 343
  }

  taosArrayDestroy(pDataBlockList);
}

X
Xiaoyu Wang 已提交
344
void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
345 346 347 348 349 350 351
  if (pDataBlockHash == NULL) {
    return;
  }

  void** p1 = taosHashIterate(pDataBlockHash, NULL);
  while (p1) {
    STableDataBlocks* pBlocks = *p1;
X
Xiaoyu Wang 已提交
352
    insDestroyDataBlock(pBlocks);
353 354 355 356 357 358 359

    p1 = taosHashIterate(pDataBlockHash, p1);
  }

  taosHashCleanup(pDataBlockHash);
}

360
#if 0
361
// data block is disordered, sort it in ascending order
X
Xiaoyu Wang 已提交
362 363
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
364 365 366 367 368

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);

  if (!dataBuf->ordered) {
X
Xiaoyu Wang 已提交
369
    char* pBlockData = pBlocks->data;
370 371

    // todo. qsort is unstable, if timestamp is same, should get the last one
wafwerar's avatar
wafwerar 已提交
372
    taosSort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
373 374 375 376

    int32_t i = 0;
    int32_t j = 1;

377
    // delete rows with timestamp conflicts
378
    while (j < pBlocks->numOfRows) {
X
Xiaoyu Wang 已提交
379 380
      TSKEY ti = *(TSKEY*)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY*)(pBlockData + dataBuf->rowSize * j);
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;

    pBlocks->numOfRows = i + 1;
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
  }

  dataBuf->prevTS = INT64_MIN;
}
403
#endif
404 405

// data block is disordered, sort it in ascending order
C
Cary Xu 已提交
406
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
X
Xiaoyu Wang 已提交
407
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
408 409 410 411 412 413 414
  int16_t     nRows = pBlocks->numOfRows;

  // size is less than the total size, since duplicated rows may be removed yet.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
X
Xiaoyu Wang 已提交
415
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
416 417 418
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
419
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
420 421 422 423
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

X
Xiaoyu Wang 已提交
424
  int32_t         extendedRowSize = insGetExtendedRowSize(dataBuf);
X
Xiaoyu Wang 已提交
425 426
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
427 428
  int             n = 0;
  while (n < nRows) {
X
Xiaoyu Wang 已提交
429
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
430
    pBlkKeyTuple->payloadAddr = pBlockData;
wafwerar's avatar
wafwerar 已提交
431
    pBlkKeyTuple->index = n;
432 433 434 435 436 437 438 439 440

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
441 442

    // todo. qsort is unstable, if timestamp is same, should get the last one
wafwerar's avatar
wafwerar 已提交
443
    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    int32_t i = 0;
    int32_t j = 1;
    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
      }
      ++j;
    }

    dataBuf->ordered = true;
    pBlocks->numOfRows = i + 1;
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return 0;
}

C
Cary Xu 已提交
474 475 476 477 478 479 480 481
static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) {
  if (pBlkRowMerger && (pBlkRowMerger->index >= 0)) {
    ASSERT(pBlkRowMerger->index < taosArrayGetSize(pBlkRowMerger->rowArray));
    return *(void**)taosArrayGet(pBlkRowMerger->rowArray, pBlkRowMerger->index);
  }
  return NULL;
}

C
Cary Xu 已提交
482
static int32_t tdBlockRowMerge(STableMeta* pTableMeta, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows,
C
Cary Xu 已提交
483 484 485 486 487 488 489
                               SBlockRowMerger** pBlkRowMerger, int32_t rowSize) {
  ASSERT(nDupRows > 1);
  SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1);
  ASSERT(pStartKeyTp->skey == pEndKeyTp->skey);

  // TODO: optimization if end row is all normal
#if 0
C
Cary Xu 已提交
490
  STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr;
C
Cary Xu 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
  if(isNormal(pEndRow)) { // set the end row if it is normal and return directly
    pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr;
    return TSDB_CODE_SUCCESS;
  }
#endif

  if (!(*pBlkRowMerger)) {
    (*pBlkRowMerger) = taosMemoryCalloc(1, sizeof(**pBlkRowMerger));
    if (!(*pBlkRowMerger)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
    (*pBlkRowMerger)->index = -1;
    if (!(*pBlkRowMerger)->rowArray) {
      (*pBlkRowMerger)->rowArray = taosArrayInit(1, sizeof(void*));
      if (!(*pBlkRowMerger)->rowArray) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
    }
  }

C
Cary Xu 已提交
513 514 515 516 517 518 519 520 521 522
  if ((*pBlkRowMerger)->pSchema) {
    if ((*pBlkRowMerger)->pSchema->version != pTableMeta->sversion) {
      taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
    } else {
      if ((*pBlkRowMerger)->tbUid != (pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid)) {
        taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
      }
    }
  }

C
Cary Xu 已提交
523
  if (!(*pBlkRowMerger)->pSchema) {
C
Cary Xu 已提交
524 525
    (*pBlkRowMerger)->pSchema =
        tdGetSTSChemaFromSSChema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
C
Cary Xu 已提交
526 527 528 529 530

    if (!(*pBlkRowMerger)->pSchema) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
531
    (*pBlkRowMerger)->tbUid = pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid;
C
Cary Xu 已提交
532 533 534 535 536
  }

  void* pDestRow = NULL;
  ++((*pBlkRowMerger)->index);
  if ((*pBlkRowMerger)->index < taosArrayGetSize((*pBlkRowMerger)->rowArray)) {
537 538
    void** pAlloc = (void**)taosArrayGet((*pBlkRowMerger)->rowArray, (*pBlkRowMerger)->index);
    if (tRealloc((uint8_t**)pAlloc, rowSize) != 0) {
C
Cary Xu 已提交
539 540
      return TSDB_CODE_FAILED;
    }
541
    pDestRow = *pAlloc;
C
Cary Xu 已提交
542 543 544 545 546 547 548 549 550 551 552 553
  } else {
    if (tRealloc((uint8_t**)&pDestRow, rowSize) != 0) {
      return TSDB_CODE_FAILED;
    }
    taosArrayPush((*pBlkRowMerger)->rowArray, &pDestRow);
  }

  // merge rows to pDestRow
  STSchema* pSchema = (*pBlkRowMerger)->pSchema;
  SArray*   pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
  for (int32_t i = 0; i < pSchema->numOfCols; ++i) {
    SColVal colVal = {0};
C
Cary Xu 已提交
554
    for (int32_t j = 0; j < nDupRows; ++j) {
C
Cary Xu 已提交
555
      tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal);
H
Hongze Cheng 已提交
556
      if (!COL_VAL_IS_NONE(&colVal)) {
C
Cary Xu 已提交
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
        break;
      }
    }
    taosArrayPush(pArray, &colVal);
  }
  if (tdSTSRowNew(pArray, pSchema, (STSRow**)&pDestRow) < 0) {
    taosArrayDestroy(pArray);
    return TSDB_CODE_FAILED;
  }

  taosArrayDestroy(pArray);
  return TSDB_CODE_SUCCESS;
}

// data block is disordered, sort it in ascending order, and merge dup rows if exists
static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo,
                                     SBlockRowMerger** ppBlkRowMerger) {
  SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
C
Cary Xu 已提交
575
  STableMeta* pTableMeta = dataBuf->pTableMeta;
576
  int32_t     nRows = pBlocks->numOfRows;
C
Cary Xu 已提交
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593

  // size is less than the total size, since duplicated rows may be removed.

  // allocate memory
  size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
  if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
    char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
    if (tmp == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
    pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
  }
  memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);

  tdResetSBlockRowMerger(*ppBlkRowMerger);

X
Xiaoyu Wang 已提交
594
  int32_t         extendedRowSize = insGetExtendedRowSize(dataBuf);
C
Cary Xu 已提交
595 596
  SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
  char*           pBlockData = pBlocks->data + pBlocks->schemaLen;
597
  int32_t         n = 0;
C
Cary Xu 已提交
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
  while (n < nRows) {
    pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
    pBlkKeyTuple->payloadAddr = pBlockData;
    pBlkKeyTuple->index = n;

    // next loop
    pBlockData += extendedRowSize;
    ++pBlkKeyTuple;
    ++n;
  }

  if (!dataBuf->ordered) {
    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;

    taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);

    pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
    bool    hasDup = false;
    int32_t nextPos = 0;
    int32_t i = 0;
    int32_t j = 1;

    while (j < nRows) {
      TSKEY ti = (pBlkKeyTuple + i)->skey;
      TSKEY tj = (pBlkKeyTuple + j)->skey;

      if (ti == tj) {
        ++j;
        continue;
      }

      if ((j - i) > 1) {
C
Cary Xu 已提交
630
        if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
631 632 633
          return TSDB_CODE_FAILED;
        }
        (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
C
Cary Xu 已提交
634 635 636
        if (!hasDup) {
          hasDup = true;
        }
C
Cary Xu 已提交
637 638 639 640 641 642 643 644 645 646 647 648 649 650
        i = j;
      } else {
        if (hasDup) {
          memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
        }
        ++i;
      }

      ++nextPos;
      ++j;
    }

    if ((j - i) > 1) {
      ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey);
C
Cary Xu 已提交
651
      if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
C
Cary Xu 已提交
652 653 654 655 656 657 658 659
        return TSDB_CODE_FAILED;
      }
      (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
    } else if (hasDup) {
      memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
    }

    dataBuf->ordered = true;
C
Cary Xu 已提交
660
    pBlocks->numOfRows = nextPos + 1;
C
Cary Xu 已提交
661 662 663 664 665 666 667 668
  }

  dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
  dataBuf->prevTS = INT64_MIN;

  return TSDB_CODE_SUCCESS;
}

669
// Erase the empty space reserved for binary data
670
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple) {
671
  // TODO: optimize this function, handle the case while binary is not presented
X
Xiaoyu Wang 已提交
672
  int32_t     nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
673
  SSubmitBlk* pBlock = pDataBlock;
X
Xiaoyu Wang 已提交
674 675
  memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
  pDataBlock = (char*)pDataBlock + nonDataLen;
676

X
Xiaoyu Wang 已提交
677
  pBlock->schemaLen = pTableDataBlock->createTbReqLen;
678 679
  pBlock->dataLen = 0;

680 681 682 683 684 685 686
  int32_t numOfRows = pBlock->numOfRows;
  for (int32_t i = 0; i < numOfRows; ++i) {
    void*     payload = (blkKeyTuple + i)->payloadAddr;
    TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
    memcpy(pDataBlock, payload, rowTLen);
    pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
    pBlock->dataLen += rowTLen;
687 688
  }

689
  return pBlock->dataLen + pBlock->schemaLen;
690 691
}

692
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, SArray** pVgDataBlocks) {
S
Shengliang Guan 已提交
693
  const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
694
  int       code = 0;
D
dapan 已提交
695
  SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
696 697 698
  SArray*   pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);

  STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
X
Xiaoyu Wang 已提交
699 700
  STableDataBlocks*  pOneTableBlock = *p;
  SBlockKeyInfo      blkKeyInfo = {0};  // share by pOneTableBlock
X
Xiaoyu Wang 已提交
701
  SBlockRowMerger*   pBlkRowMerger = NULL;
C
Cary Xu 已提交
702

703
  while (pOneTableBlock) {
X
Xiaoyu Wang 已提交
704
    SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
705 706
    if (pBlocks->numOfRows > 0) {
      STableDataBlocks* dataBuf = NULL;
X
Xiaoyu Wang 已提交
707
      pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId;  // for schemaless, restore origin vgId
X
Xiaoyu Wang 已提交
708 709 710
      int32_t ret = insGetDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId,
                                            sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
                                            pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
711
      if (ret != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
712
        tdFreeSBlockRowMerger(pBlkRowMerger);
713
        taosHashCleanup(pVnodeDataBlockHashList);
X
Xiaoyu Wang 已提交
714
        insDestroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
715
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
716 717
        return ret;
      }
X
Xiaoyu Wang 已提交
718
      ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
719
      // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
720
      int64_t destSize = dataBuf->size + pOneTableBlock->size +
X
Xiaoyu Wang 已提交
721 722
                         sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
                         pOneTableBlock->createTbReqLen;
723 724 725

      if (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
wafwerar's avatar
wafwerar 已提交
726
        char* tmp = taosMemoryRealloc(dataBuf->pData, dataBuf->nAllocSize);
727 728 729
        if (tmp != NULL) {
          dataBuf->pData = tmp;
        } else {  // failed to allocate memory, free already allocated memory and return error code
C
Cary Xu 已提交
730
          tdFreeSBlockRowMerger(pBlkRowMerger);
731
          taosHashCleanup(pVnodeDataBlockHashList);
X
Xiaoyu Wang 已提交
732
          insDestroyBlockArrayList(pVnodeDataBlockList);
wafwerar's avatar
wafwerar 已提交
733 734
          taosMemoryFreeClear(dataBuf->pData);
          taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
735 736 737 738
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }

739 740 741 742 743 744 745
      if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
        tdFreeSBlockRowMerger(pBlkRowMerger);
        taosHashCleanup(pVnodeDataBlockHashList);
        insDestroyBlockArrayList(pVnodeDataBlockList);
        taosMemoryFreeClear(dataBuf->pData);
        taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
        return code;
746
      }
747
      ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
748 749

      // erase the empty space reserved for binary data
750
      int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple);
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765

      dataBuf->size += (finalLen + sizeof(SSubmitBlk));
      assert(dataBuf->size <= dataBuf->nAllocSize);
      dataBuf->numOfTables += 1;
    }

    p = taosHashIterate(pHashObj, p);
    if (p == NULL) {
      break;
    }

    pOneTableBlock = *p;
  }

  // free the table data blocks;
C
Cary Xu 已提交
766
  tdFreeSBlockRowMerger(pBlkRowMerger);
767
  taosHashCleanup(pVnodeDataBlockHashList);
wafwerar's avatar
wafwerar 已提交
768
  taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
769
  *pVgDataBlocks = pVnodeDataBlockList;
770 771 772
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
773
int32_t insAllocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
X
Xiaoyu Wang 已提交
774
  size_t   remain = pDataBlock->nAllocSize - pDataBlock->size;
D
stmt  
dapan1121 已提交
775
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
X
Xiaoyu Wang 已提交
776

D
stmt  
dapan1121 已提交
777 778 779 780
  // expand the allocated size
  if (remain < allSize) {
    pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;

X
Xiaoyu Wang 已提交
781
    char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
D
stmt  
dapan1121 已提交
782 783 784 785 786 787 788 789 790 791 792 793 794
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
      // do nothing, if allocate more memory failed
      pDataBlock->nAllocSize = nAllocSizeOld;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
795 796 797 798 799 800 801
int32_t insInitRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
  ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
  tdSRowInit(pBuilder, schemaVer);
  tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
                        pColInfo->boundNullLen);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
802

X
Xiaoyu Wang 已提交
803 804 805 806 807 808 809 810
static char* tableNameGetPosition(SToken* pToken, char target) {
  bool inEscape = false;
  bool inQuote = false;
  char quotaStr = 0;

  for (uint32_t i = 0; i < pToken->n; ++i) {
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
      return pToken->z + i;
811 812
    }

X
Xiaoyu Wang 已提交
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
      if (!inQuote) {
        inEscape = !inEscape;
      }
    }

    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
      if (!inEscape) {
        if (!inQuote) {
          quotaStr = *(pToken->z + i);
          inQuote = !inQuote;
        } else if (quotaStr == *(pToken->z + i)) {
          inQuote = !inQuote;
        }
      }
828 829 830
    }
  }

X
Xiaoyu Wang 已提交
831
  return NULL;
832 833
}

X
Xiaoyu Wang 已提交
834 835 836 837 838 839 840 841
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";
  const char* msg4 = "invalid table name";

  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
D
stmt  
dapan1121 已提交
842

X
Xiaoyu Wang 已提交
843 844
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
    assert(*p == TS_PATH_DELIMITER[0]);
D
stmt  
dapan1121 已提交
845

X
Xiaoyu Wang 已提交
846 847 848
    int32_t dbLen = p - pTableName->z;
    if (dbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg2);
D
stmt  
dapan1121 已提交
849
    }
X
Xiaoyu Wang 已提交
850 851 852
    char name[TSDB_DB_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, dbLen);
    int32_t actualDbLen = strdequote(name);
X
Xiaoyu Wang 已提交
853

X
Xiaoyu Wang 已提交
854 855 856 857
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
X
Xiaoyu Wang 已提交
858

X
Xiaoyu Wang 已提交
859 860 861 862
    int32_t tbLen = pTableName->n - dbLen - 1;
    if (tbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg4);
    }
D
stmt  
dapan1121 已提交
863

X
Xiaoyu Wang 已提交
864 865 866
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(tbname, p + 1, tbLen);
    /*tbLen = */ strdequote(tbname);
D
stmt  
dapan1121 已提交
867

X
Xiaoyu Wang 已提交
868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

    if (dbName == NULL) {
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }
X
Xiaoyu Wang 已提交
886

X
Xiaoyu Wang 已提交
887 888 889 890 891
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }
X
Xiaoyu Wang 已提交
892

X
Xiaoyu Wang 已提交
893 894 895
    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
D
dapan1121 已提交
896 897 898
    }
  }

X
Xiaoyu Wang 已提交
899
  return code;
D
stmt  
dapan1121 已提交
900 901
}

X
Xiaoyu Wang 已提交
902 903 904 905 906 907
int32_t insFindCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
D
stmt  
dapan1121 已提交
908
  }
X
Xiaoyu Wang 已提交
909 910
  return -1;
}
D
stmt  
dapan1121 已提交
911

X
Xiaoyu Wang 已提交
912 913 914 915 916 917 918 919 920 921 922 923 924 925
void insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
                         SArray* tagName, uint8_t tagNum) {
  pTbReq->type = TD_CHILD_TABLE;
  pTbReq->name = strdup(tname);
  pTbReq->ctb.suid = suid;
  pTbReq->ctb.tagNum = tagNum;
  if (sname) pTbReq->ctb.stbName = strdup(sname);
  pTbReq->ctb.pTag = (uint8_t*)pTag;
  pTbReq->ctb.tagName = taosArrayDup(tagName);
  pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL;
  pTbReq->commentLen = -1;

  return;
}
D
stmt  
dapan1121 已提交
926

X
Xiaoyu Wang 已提交
927 928 929
int32_t insMemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
X
Xiaoyu Wang 已提交
930

X
Xiaoyu Wang 已提交
931 932 933
  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
934
  }
X
Xiaoyu Wang 已提交
935

X
Xiaoyu Wang 已提交
936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
    const char* rowEnd = tdRowEnd(rb->pBuf);
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
    }
    varDataSetLen(rowEnd, output);
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
  } else {
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
  }
D
stmt  
dapan1121 已提交
957

D
stmt  
dapan1121 已提交
958 959 960
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
961 962 963 964 965
int32_t insCheckTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan 已提交
966

X
Xiaoyu Wang 已提交
967 968 969
  TSKEY k = *(TSKEY*)start;
  if (k <= pDataBlocks->prevTS) {
    pDataBlocks->ordered = false;
D
stmt  
dapan1121 已提交
970 971
  }

X
Xiaoyu Wang 已提交
972 973
  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
974 975
}

X
Xiaoyu Wang 已提交
976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
    int32_t schemaLen = blk->schemaLen;
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htonl(blk->numOfRows);
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
D
stmt  
dapan1121 已提交
994
  }
X
Xiaoyu Wang 已提交
995
}
D
stmt  
dapan1121 已提交
996

X
Xiaoyu Wang 已提交
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
int32_t insBuildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
    TSWAP(dst->pData, src->pData);
    buildMsgHeader(src, dst);
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
1017
}