parInsertStmt.c 15.7 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "parInsertUtil.h"
#include "parInt.h"
#include "parToken.h"
#include "query.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

typedef struct SKvParam {
  int16_t  pos;
  SArray*  pTagVals;
  SSchema* schema;
  char     buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

X
Xiaoyu Wang 已提交
32
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) {
D
dapan1121 已提交
33 34 35 36 37
  *pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
  if (NULL == *pData) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
38 39
  SSubmitTbData* pNew = *pData;

D
dapan1121 已提交
40
  *pNew = *pDataBlock->pData;
X
Xiaoyu Wang 已提交
41

D
dapan1121 已提交
42 43 44 45 46
  cloneSVreateTbReq(pDataBlock->pData->pCreateTbReq, &pNew->pCreateTbReq);
  pNew->aCol = taosArrayDup(pDataBlock->pData->aCol, NULL);

  int32_t colNum = taosArrayGetSize(pNew->aCol);
  for (int32_t i = 0; i < colNum; ++i) {
X
Xiaoyu Wang 已提交
47
    SColData* pCol = (SColData*)taosArrayGet(pNew->aCol, i);
D
dapan1121 已提交
48 49 50 51 52 53
    tColDataDeepClear(pCol);
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
54
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
55 56 57 58
  int32_t             code = TSDB_CODE_SUCCESS;
  SArray*             pVgDataBlocks = NULL;
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;

X
Xiaoyu Wang 已提交
59
  // merge according to vgId
X
Xiaoyu Wang 已提交
60
  if (taosHashGetSize(pBlockHash) > 0) {
D
dapan1121 已提交
61
    code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks);
X
Xiaoyu Wang 已提交
62
  }
X
Xiaoyu Wang 已提交
63
  if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
64 65
    code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
  }
X
Xiaoyu Wang 已提交
66

D
dapan1121 已提交
67 68
  if (pStmt->freeArrayFunc) {
    pStmt->freeArrayFunc(pVgDataBlocks);
X
Xiaoyu Wang 已提交
69 70
  }
  return code;
X
Xiaoyu Wang 已提交
71 72 73 74
}

int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
                           TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
X
Xiaoyu Wang 已提交
75 76 77
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
  SMsgBuf        pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t        code = TSDB_CODE_SUCCESS;
78
  SBoundColInfo* tags = (SBoundColInfo*)boundTags;
X
Xiaoyu Wang 已提交
79
  if (NULL == tags) {
S
Shengliang Guan 已提交
80
    return TSDB_CODE_APP_ERROR;
X
Xiaoyu Wang 已提交
81 82 83 84 85 86 87 88 89
  }

  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
    return buildInvalidOperationMsg(&pBuf, "out of memory");
  }

  SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
  if (!tagName) {
D
dapan1121 已提交
90 91
    code = buildInvalidOperationMsg(&pBuf, "out of memory");
    goto end;
X
Xiaoyu Wang 已提交
92 93
  }

D
dapan1121 已提交
94
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
X
Xiaoyu Wang 已提交
95 96 97 98 99 100 101 102 103

  bool  isJson = false;
  STag* pTag = NULL;

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      continue;
    }

104
    SSchema* pTagSchema = &pSchema[tags->pColIndex[c]];
X
Xiaoyu Wang 已提交
105 106 107
    int32_t  colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
D
dapan1121 已提交
108 109 110 111
      if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
        code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
        goto end;
      }
X
Xiaoyu Wang 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
    }
    taosArrayPush(tagName, pTagSchema->name);
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
      if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
        goto end;
      }

      isJson = true;
      char* tmp = taosMemoryCalloc(1, colLen + 1);
      memcpy(tmp, bind[c].buffer, colLen);
      code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
      taosMemoryFree(tmp);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
    } else {
      STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
      //      strcpy(val.colName, pTagSchema->name);
D
Dingle Zhang 已提交
131 132
      if (pTagSchema->type == TSDB_DATA_TYPE_BINARY ||
          pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) {
X
Xiaoyu Wang 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
        val.pData = (uint8_t*)bind[c].buffer;
        val.nData = colLen;
      } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t output = 0;
        void*   p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
        if (p == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto end;
        }
        if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
          if (errno == E2BIG) {
            taosMemoryFree(p);
            code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
            goto end;
          }
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
          taosMemoryFree(p);
          code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
          goto end;
        }
        val.pData = p;
        val.nData = output;
      } else {
        memcpy(&val.i64, bind[c].buffer, colLen);
      }
      taosArrayPush(pTagArray, &val);
    }
  }

  if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
    goto end;
  }

D
dapan1121 已提交
167 168 169 170 171 172 173 174
  if (NULL == pDataBlock->pData->pCreateTbReq) {
    pDataBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
    if (NULL == pDataBlock->pData->pCreateTbReq) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto end;
    }
  }

X
Xiaoyu Wang 已提交
175 176
  insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
                      pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
X
Xiaoyu Wang 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190

end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
      taosMemoryFreeClear(p->pData);
    }
  }
  taosArrayDestroy(pTagArray);
  taosArrayDestroy(tagName);

  return code;
}

D
dapan1121 已提交
191
int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND* src, TAOS_MULTI_BIND* dst) {
D
dapan1121 已提交
192
  int32_t output = 0;
D
dapan1121 已提交
193
  int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num;
D
dapan1121 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207
  if (dst->buffer_length < newBuflen) {
    dst->buffer = taosMemoryRealloc(dst->buffer, newBuflen);
    if (NULL == dst->buffer) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (NULL == dst->length) {
    dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num);
    if (NULL == dst->buffer) {
      taosMemoryFreeClear(dst->buffer);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
D
dapan1121 已提交
208 209 210 211 212 213 214

  dst->buffer_length = pSchema->bytes - VARSTR_HEADER_SIZE;

  for (int32_t i = 0; i < src->num; ++i) {
    if (src->is_null && src->is_null[i]) {
      continue;
    }
X
Xiaoyu Wang 已提交
215

X
Xiaoyu Wang 已提交
216 217
    if (!taosMbsToUcs4(((char*)src->buffer) + src->buffer_length * i, src->length[i],
                       (TdUcs4*)(((char*)dst->buffer) + dst->buffer_length * i), dst->buffer_length, &output)) {
D
dapan1121 已提交
218 219 220 221 222 223
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
      }
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, NULL);
D
dapan1121 已提交
224
    }
D
dapan1121 已提交
225 226

    dst->length[i] = output;
D
dapan1121 已提交
227 228
  }

D
dapan1121 已提交
229 230 231 232 233
  dst->buffer_type = src->buffer_type;
  dst->is_null = src->is_null;
  dst->num = src->num;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
234 235
}

X
Xiaoyu Wang 已提交
236
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
X
Xiaoyu Wang 已提交
237 238 239 240 241 242 243 244
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
  SSchema*         pSchema = getTableColumnSchema(pDataBlock->pMeta);
  SBoundColInfo*   boundInfo = &pDataBlock->boundColsInfo;
  SMsgBuf          pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t          rowNum = bind->num;
  TAOS_MULTI_BIND  ncharBind = {0};
  TAOS_MULTI_BIND* pBind = NULL;
  int32_t          code = 0;
X
Xiaoyu Wang 已提交
245

D
dapan1121 已提交
246
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
X
Xiaoyu Wang 已提交
247
    SSchema*  pColSchema = &pSchema[boundInfo->pColIndex[c]];
D
dapan1121 已提交
248
    SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c);
X
Xiaoyu Wang 已提交
249

D
dapan1121 已提交
250
    if (bind[c].num != rowNum) {
D
dapan1121 已提交
251 252
      code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      goto _return;
X
Xiaoyu Wang 已提交
253
    }
D
dapan1121 已提交
254

255
    if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
D
dapan1121 已提交
256 257
      code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      goto _return;
X
Xiaoyu Wang 已提交
258
    }
D
dapan1121 已提交
259

D
dapan1121 已提交
260
    if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
D
dapan1121 已提交
261
      code = convertStmtNcharCol(&pBuf, pColSchema, bind + c, &ncharBind);
D
dapan1121 已提交
262 263 264 265 266 267 268 269
      if (code) {
        goto _return;
      }
      pBind = &ncharBind;
    } else {
      pBind = bind + c;
    }

D
dapan1121 已提交
270 271 272 273
    code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
    if (code) {
      goto _return;
    }
X
Xiaoyu Wang 已提交
274 275
  }

D
dapan1121 已提交
276 277
  qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

D
dapan1121 已提交
278 279 280 281 282 283
_return:

  taosMemoryFree(ncharBind.buffer);
  taosMemoryFree(ncharBind.length);

  return code;
X
Xiaoyu Wang 已提交
284 285 286 287
}

int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
X
Xiaoyu Wang 已提交
288 289 290 291 292 293 294 295 296 297
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
  SSchema*         pSchema = getTableColumnSchema(pDataBlock->pMeta);
  SBoundColInfo*   boundInfo = &pDataBlock->boundColsInfo;
  SMsgBuf          pBuf = {.buf = msgBuf, .len = msgBufLen};
  SSchema*         pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
  SColData*        pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx);
  TAOS_MULTI_BIND  ncharBind = {0};
  TAOS_MULTI_BIND* pBind = NULL;
  int32_t          code = 0;

D
dapan1121 已提交
298
  if (bind->num != rowNum) {
D
dapan1121 已提交
299
    return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
X
Xiaoyu Wang 已提交
300 301
  }

D
dapan1121 已提交
302
  if (bind->buffer_type != pColSchema->type) {
D
dapan1121 已提交
303
    return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
X
Xiaoyu Wang 已提交
304
  }
D
dapan1121 已提交
305 306 307 308 309 310 311 312 313 314

  if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
    code = convertStmtNcharCol(&pBuf, pColSchema, bind, &ncharBind);
    if (code) {
      goto _return;
    }
    pBind = &ncharBind;
  } else {
    pBind = bind;
  }
X
Xiaoyu Wang 已提交
315

D
dapan1121 已提交
316
  tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
X
Xiaoyu Wang 已提交
317

D
dapan1121 已提交
318 319
  qDebug("stmt col %d bind %d rows data", colIdx, rowNum);

D
dapan1121 已提交
320 321 322 323 324 325
_return:

  taosMemoryFree(ncharBind.buffer);
  taosMemoryFree(ncharBind.length);

  return code;
X
Xiaoyu Wang 已提交
326 327
}

X
Xiaoyu Wang 已提交
328 329
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum,
                         TAOS_FIELD_E** fields, uint8_t timePrec) {
X
Xiaoyu Wang 已提交
330
  if (fields) {
D
dapan1121 已提交
331
    *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD_E));
X
Xiaoyu Wang 已提交
332 333 334 335
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

D
dapan1121 已提交
336
    SSchema* schema = &pSchema[boundColumns[0]];
X
Xiaoyu Wang 已提交
337 338 339 340
    if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
      (*fields)[0].precision = timePrec;
    }

D
dapan1121 已提交
341 342
    for (int32_t i = 0; i < numOfBound; ++i) {
      schema = &pSchema[boundColumns[i]];
X
Xiaoyu Wang 已提交
343 344 345 346 347 348
      strcpy((*fields)[i].name, schema->name);
      (*fields)[i].type = schema->type;
      (*fields)[i].bytes = schema->bytes;
    }
  }

D
dapan1121 已提交
349
  *fieldNum = numOfBound;
X
Xiaoyu Wang 已提交
350 351 352 353 354

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
X
Xiaoyu Wang 已提交
355
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
356
  SBoundColInfo* tags = (SBoundColInfo*)boundTags;
X
Xiaoyu Wang 已提交
357
  if (NULL == tags) {
S
Shengliang Guan 已提交
358
    return TSDB_CODE_APP_ERROR;
X
Xiaoyu Wang 已提交
359 360
  }

D
dapan1121 已提交
361
  if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) {
X
Xiaoyu Wang 已提交
362 363 364
    return TSDB_CODE_TSC_STMT_API_ERROR;
  }

D
dapan1121 已提交
365
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
X
Xiaoyu Wang 已提交
366 367 368 369 370 371 372
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

373
  CHECK_CODE(buildBoundFields(tags->numOfBound, tags->pColIndex, pSchema, fieldNum, fields, 0));
X
Xiaoyu Wang 已提交
374 375 376 377 378

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
dapan1121 已提交
379 380 381
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
  SSchema*       pSchema = getTableColumnSchema(pDataBlock->pMeta);
  if (pDataBlock->boundColsInfo.numOfBound <= 0) {
X
Xiaoyu Wang 已提交
382 383 384 385 386 387 388 389
    *fieldNum = 0;
    if (fields) {
      *fields = NULL;
    }

    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
390 391
  CHECK_CODE(buildBoundFields(pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema,
                              fieldNum, fields, pDataBlock->pMeta->tableInfo.precision));
X
Xiaoyu Wang 已提交
392 393 394 395

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
396
int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
D
dapan1121 已提交
397
  STableDataCxt* pBlock = (STableDataCxt*)block;
X
Xiaoyu Wang 已提交
398
  int32_t        colNum = taosArrayGetSize(pBlock->pData->aCol);
X
Xiaoyu Wang 已提交
399

D
dapan1121 已提交
400
  for (int32_t i = 0; i < colNum; ++i) {
X
Xiaoyu Wang 已提交
401
    SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
D
dapan1121 已提交
402 403 404 405
    if (deepClear) {
      tColDataDeepClear(pCol);
    } else {
      tColDataClear(pCol);
X
Xiaoyu Wang 已提交
406 407 408 409 410 411
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
412
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset) {
D
dapan1121 已提交
413
  int32_t code = 0;
X
Xiaoyu Wang 已提交
414

D
dapan1121 已提交
415
  *pDst = taosMemoryCalloc(1, sizeof(STableDataCxt));
X
Xiaoyu Wang 已提交
416 417 418 419
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
420 421 422 423
  STableDataCxt* pNewCxt = (STableDataCxt*)*pDst;
  STableDataCxt* pCxt = (STableDataCxt*)pSrc;
  pNewCxt->pSchema = NULL;
  pNewCxt->pValues = NULL;
X
Xiaoyu Wang 已提交
424

D
dapan1121 已提交
425 426
  if (pCxt->pMeta) {
    void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pCxt->pMeta));
X
Xiaoyu Wang 已提交
427
    if (NULL == pNewMeta) {
D
dapan1121 已提交
428
      insDestroyTableDataCxt(*pDst);
X
Xiaoyu Wang 已提交
429 430
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
431 432
    memcpy(pNewMeta, pCxt->pMeta, TABLE_META_SIZE(pCxt->pMeta));
    pNewCxt->pMeta = pNewMeta;
X
Xiaoyu Wang 已提交
433 434
  }

D
dapan1121 已提交
435 436
  memcpy(&pNewCxt->boundColsInfo, &pCxt->boundColsInfo, sizeof(pCxt->boundColsInfo));
  pNewCxt->boundColsInfo.pColIndex = NULL;
X
Xiaoyu Wang 已提交
437

D
dapan1121 已提交
438 439 440 441 442 443
  if (pCxt->boundColsInfo.pColIndex) {
    void* pNewColIdx = taosMemoryMalloc(pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
    if (NULL == pNewColIdx) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
444 445
    memcpy(pNewColIdx, pCxt->boundColsInfo.pColIndex,
           pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
D
dapan1121 已提交
446
    pNewCxt->boundColsInfo.pColIndex = pNewColIdx;
X
Xiaoyu Wang 已提交
447 448
  }

D
dapan1121 已提交
449
  if (pCxt->pData) {
X
Xiaoyu Wang 已提交
450
    SSubmitTbData* pNewTb = (SSubmitTbData*)taosMemoryMalloc(sizeof(SSubmitTbData));
D
dapan1121 已提交
451 452 453 454
    if (NULL == pNewTb) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
455

D
dapan1121 已提交
456 457
    memcpy(pNewTb, pCxt->pData, sizeof(*pCxt->pData));
    pNewTb->pCreateTbReq = NULL;
X
Xiaoyu Wang 已提交
458

D
dapan1121 已提交
459 460 461 462 463
    pNewTb->aCol = taosArrayDup(pCxt->pData->aCol, NULL);
    if (NULL == pNewTb) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
464 465

    pNewCxt->pData = pNewTb;
X
Xiaoyu Wang 已提交
466

D
dapan1121 已提交
467 468 469
    if (reset) {
      code = qResetStmtDataBlock(*pDst, true);
    }
X
Xiaoyu Wang 已提交
470 471
  }

D
dapan1121 已提交
472
  return code;
X
Xiaoyu Wang 已提交
473 474
}

X
Xiaoyu Wang 已提交
475 476
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId,
                              bool rebuildCreateTb) {
D
dapan1121 已提交
477 478 479 480
  int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
  if (code) {
    return code;
  }
X
Xiaoyu Wang 已提交
481

D
dapan1121 已提交
482 483 484 485
  STableDataCxt* pBlock = (STableDataCxt*)*pDst;
  if (pBlock->pMeta) {
    pBlock->pMeta->uid = uid;
    pBlock->pMeta->vgId = vgId;
D
dapan1121 已提交
486
    pBlock->pMeta->suid = suid;
X
Xiaoyu Wang 已提交
487 488
  }

D
dapan1121 已提交
489 490 491
  pBlock->pData->suid = suid;
  pBlock->pData->uid = uid;

D
dapan1121 已提交
492
  if (rebuildCreateTb && NULL == pBlock->pData->pCreateTbReq) {
D
dapan1121 已提交
493 494 495 496 497 498
    pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
    if (NULL == pBlock->pData->pCreateTbReq) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

D
dapan1121 已提交
499
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
500 501
}

D
dapan1121 已提交
502
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }
D
dapan1121 已提交
503

D
dapan1121 已提交
504
void qDestroyStmtDataBlock(STableDataCxt* pBlock) {
X
Xiaoyu Wang 已提交
505 506 507 508
  if (pBlock == NULL) {
    return;
  }

D
dapan1121 已提交
509 510
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
  insDestroyTableDataCxt(pDataBlock);
X
Xiaoyu Wang 已提交
511
}