parInsert.c 51.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43 44
  } while (0)

typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
45
  SParseContext* pComCxt;       // input
46 47
  char          *pSql;          // input
  SMsgBuf        msg;           // input
X
Xiaoyu Wang 已提交
48 49 50
  STableMeta* pTableMeta;       // each table
  SParsedDataColInfo tags;      // each table
  SKVRowBuilder tagsBuilder;    // each table
X
Xiaoyu Wang 已提交
51
  SVCreateTbReq createTblReq;   // each table
X
Xiaoyu Wang 已提交
52 53
  SHashObj* pVgroupsHashObj;    // global
  SHashObj* pTableBlockHashObj; // global
X
Xiaoyu Wang 已提交
54
  SHashObj* pSubTableHashObj;   // global
X
Xiaoyu Wang 已提交
55
  SArray* pVgDataBlocks;        // global
56
  int32_t totalNum;
X
Xiaoyu Wang 已提交
57
  SVnodeModifOpStmt* pOutput;
D
stmt  
dapan1121 已提交
58
  SStmtCallback* pStmtCb;
59 60
} SInsertParseContext;

H
refact  
Hongze Cheng 已提交
61
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
62 63 64 65

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
typedef struct SKvParam {
  SKVRowBuilder *builder;
  SSchema       *schema;
  char           buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;


#define CHECK_CODE(expr) \
  do { \
    int32_t code = expr; \
    if (TSDB_CODE_SUCCESS != code) { \
      return code; \
    } \
  } while (0)


89 90 91 92 93 94 95 96 97 98 99 100 101
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INSERT != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
  }
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INTO != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
  }
  return TSDB_CODE_SUCCESS;
}

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

164
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
165
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
166
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
167 168
  }

169
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
170
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
171
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
172
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
173 174
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
175
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
176 177
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
178

179 180 181
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
182

D
stmt  
dapan1121 已提交
183
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
184 185 186 187
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";

H
refact  
Hongze Cheng 已提交
188 189
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
190

H
refact  
Hongze Cheng 已提交
191
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
192 193 194
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
195
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
196 197 198
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
199
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
200 201 202 203 204
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
H
refact  
Hongze Cheng 已提交
205
    char    tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
206
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
207
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
224
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
225 226 227
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
228
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241 242
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

D
stmt  
dapan1121 已提交
243
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) {
H
Haojun Liao 已提交
244
  SParseContext* pBasicCtx = pCxt->pComCxt;
245
  SName name = {0};
D
stmt  
dapan1121 已提交
246
  createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg);  
D
stmt  
dapan1121 已提交
247
  if (isStb) {
H
refact  
Hongze Cheng 已提交
248 249
    CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
                                    &pCxt->pTableMeta));
D
stmt  
dapan1121 已提交
250
  } else {
H
refact  
Hongze Cheng 已提交
251 252
    CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
                                   &pCxt->pTableMeta));
D
stmt  
dapan1121 已提交
253
  }
X
Xiaoyu Wang 已提交
254
  SVgroupInfo vg;
H
refact  
Hongze Cheng 已提交
255 256
  CHECK_CODE(
      catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
X
Xiaoyu Wang 已提交
257
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
258

H
refact  
Hongze Cheng 已提交
259
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
260 261
}

H
refact  
Hongze Cheng 已提交
262
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, false); }
D
stmt  
dapan1121 已提交
263

H
refact  
Hongze Cheng 已提交
264
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, true); }
D
stmt  
dapan1121 已提交
265

266 267 268 269 270 271 272 273 274 275
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
276
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
    blk = (SSubmitBlk*)(blk->data + dataLen);
  }
295 296 297 298 299 300 301 302 303 304
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
305
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
306 307 308
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
309
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
310 311
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
dengyihao's avatar
dengyihao 已提交
312
    TSWAP(dst->pData, src->pData, char*);
D
dapan1121 已提交
313
    buildMsgHeader(src, dst);
314 315 316 317 318
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
319
int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
320 321 322 323 324
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
325
  TSKEY k = *(TSKEY*)start;
326
  if (k <= pDataBlocks->prevTS) {
327 328 329 330 331 332 333
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
334 335 336 337 338 339
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
340 341

  if (pToken->type == TK_NOW) {
342
    ts = taosGetTimestamp(timePrec);
343 344
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
345
  } else if (pToken->type == TK_NK_INTEGER) {
346 347
    bool isSigned = false;
    toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
H
refact  
Hongze Cheng 已提交
348
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
349
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
350
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
351 352 353 354 355 356 357
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
358
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
359 360 361 362
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
363
    if (pToken->z[k] == ',') {
364 365
      *end = pTokenEnd;
      *time = ts;
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
381
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
382 383 384 385 386
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
387
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
388 389 390 391 392 393 394
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

395
    if (sToken.type == TK_NK_PLUS) {
396
      ts += interval;
397
    } else {
398
      ts = ts - interval;
399 400
    }

401
    *end = pTokenEnd;
402 403
  }

404
  *time = ts;
405 406
  return TSDB_CODE_SUCCESS;
}
407

X
Xiaoyu Wang 已提交
408
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
409 410 411 412
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
413
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
414 415 416 417
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
418
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
419 420 421 422
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

423
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
424
    pToken->z = tmpTokenBuf;
425
    pToken->n = len;
X
Xiaoyu Wang 已提交
426 427 428 429 430
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
431
static bool isNullStr(SToken* pToken) {
432
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
433 434 435
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
436
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
437 438 439 440 441
  errno = 0;
  *value = strtold(pToken->z, endPtr);

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
442
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
443 444 445 446 447
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
448 449
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
450
  int64_t iv;
H
refact  
Hongze Cheng 已提交
451
  char*   endptr = NULL;
X
Xiaoyu Wang 已提交
452 453 454 455 456 457 458 459 460
  bool    isSigned = false;

  int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
461
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
462 463
    }

X
Xiaoyu Wang 已提交
464
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
465 466 467 468
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
469
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
470
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
471
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
472
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
473
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
474 475 476
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
477
      } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
478
        return func(pMsgBuf, ((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
479
      } else if (pToken->type == TK_NK_FLOAT) {
X
Xiaoyu Wang 已提交
480
        return func(pMsgBuf, ((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
X
Xiaoyu Wang 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
494
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
495 496
    }

H
refact  
Hongze Cheng 已提交
497
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
498 499 500 501 502 503
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
      } else if (!IS_VALID_UTINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
504
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
505 506 507 508 509 510 511 512 513
    }

    case TSDB_DATA_TYPE_SMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
514
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
515 516 517 518 519 520 521 522 523
    }

    case TSDB_DATA_TYPE_USMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
      } else if (!IS_VALID_USMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
      uint16_t tmpVal = (uint16_t)iv;
X
Xiaoyu Wang 已提交
524
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
525 526 527 528 529 530 531 532 533
    }

    case TSDB_DATA_TYPE_INT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
534
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
535 536 537 538 539 540 541 542 543
    }

    case TSDB_DATA_TYPE_UINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
      } else if (!IS_VALID_UINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
      uint32_t tmpVal = (uint32_t)iv;
X
Xiaoyu Wang 已提交
544
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
545 546 547 548 549 550 551 552
    }

    case TSDB_DATA_TYPE_BIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
553
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
554 555 556 557 558 559 560 561 562
    }

    case TSDB_DATA_TYPE_UBIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
      } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
      uint64_t tmpVal = (uint64_t)iv;
X
Xiaoyu Wang 已提交
563
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
564 565 566 567
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
568
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
569 570
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
571 572
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
573 574 575
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
576
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
577 578 579 580
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
581
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
582 583 584 585 586
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
587
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
588 589 590 591 592 593 594 595
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
        return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
      }

X
Xiaoyu Wang 已提交
596
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
597 598 599
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
600
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
601 602
    }

603 604 605 606 607 608 609
    case TSDB_DATA_TYPE_JSON: {
      if(pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }

X
Xiaoyu Wang 已提交
610 611 612 613 614 615
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
616
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
617 618 619 620 621 622
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
623
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
624 625
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
626 627 628 629 630 631

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

632
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
633
    const char* rowEnd = tdRowEnd(rb->pBuf);
634
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
C
Cary Xu 已提交
635
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, true, pa->toffset, pa->colIdx);
636 637
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
638 639
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
640
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
641 642 643
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
644
    }
645
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
646
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
647
  } else {
648
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
649
  }
650

651 652
  return TSDB_CODE_SUCCESS;
}
653 654 655

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
656
  col_id_t nCols = pColList->numOfCols;
657

H
refact  
Hongze Cheng 已提交
658
  pColList->numOfBound = 0;
C
Cary Xu 已提交
659
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
660
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
661
  for (col_id_t i = 0; i < nCols; ++i) {
662 663 664
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
665 666
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
667
  col_id_t lastColIdx = -1;  // last column found
668 669 670
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

671
    if (TK_NK_RP == sToken.type) {
672 673 674
      break;
    }

C
Cary Xu 已提交
675 676
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
677 678 679 680 681 682 683 684 685 686 687 688
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z);
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
689
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
690
    ++pColList->numOfBound;
C
Cary Xu 已提交
691 692 693 694 695 696 697 698 699 700 701
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
702 703 704 705 706
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
707
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
708 709 710 711
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
712
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
713
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
714 715 716
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
717
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
718 719 720 721 722
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

723 724 725 726
  if(pColList->numOfCols > pColList->numOfBound){
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
727 728 729 730

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
731
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) {
732 733
  SKvParam* pa = (SKvParam*) param;

734 735
  int8_t  type = pa->schema->type;
  int16_t colId = pa->schema->colId;
736

737 738 739 740 741 742 743 744 745
  if(TSDB_DATA_TYPE_JSON == type){
    return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
  }

  if (value == NULL) {  // it is a null data
    // tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

746 747
  if (TSDB_DATA_TYPE_BINARY == type) {
    STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
748
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
749 750 751
  } else if (TSDB_DATA_TYPE_NCHAR == type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
    int32_t output = 0;
wafwerar's avatar
wafwerar 已提交
752
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
753 754
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
755
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
756 757 758
    }

    varDataSetLen(pa->buf, output);
759
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
760
  } else {
761
    tdAddColToKVRow(pa->builder, colId, value, TYPE_BYTES[type]);
762 763 764 765 766
  }

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
767
static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const SName* pName, SKVRow row, int64_t suid) {
X
Xiaoyu Wang 已提交
768 769
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pName, dbFName);
D
stmt  
dapan1121 已提交
770 771 772 773
  pTbReq->type = TD_CHILD_TABLE;
  pTbReq->name = strdup(pName->tname);
  pTbReq->ctbCfg.suid = suid;
  pTbReq->ctbCfg.pTag = row;
X
Xiaoyu Wang 已提交
774 775 776 777

  return TSDB_CODE_SUCCESS;
}

778
// pSql -> tag1_value, ...)
X
Xiaoyu Wang 已提交
779
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const SName* pName) {
780
  if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
781 782 783
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

784
  SKvParam param = {.builder = &pCxt->tagsBuilder};
785
  SToken sToken;
D
stmt  
dapan1121 已提交
786
  bool isParseBindParam = false;  
787
  char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
788
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
789
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
790 791 792 793 794 795 796 797 798 799 800 801 802 803

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
    }
    
X
Xiaoyu Wang 已提交
804 805
    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
    param.schema = pTagSchema;
H
refact  
Hongze Cheng 已提交
806 807
    CHECK_CODE(
        parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
808 809
  }

D
stmt  
dapan1121 已提交
810 811 812 813
  if (isParseBindParam) {
    return TSDB_CODE_SUCCESS;
  }

814
  SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
815 816 817 818 819
  if (NULL == row) {
    return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

D
stmt  
dapan1121 已提交
820
  return buildCreateTbReq(&pCxt->createTblReq, pName, row, pCxt->pTableMeta->suid);
X
Xiaoyu Wang 已提交
821
}
822

X
Xiaoyu Wang 已提交
823 824 825 826 827 828 829 830
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
831

X
Xiaoyu Wang 已提交
832 833 834 835 836
static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, STableMeta* pMeta) {
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
837
  pBackup->uid = tGenIdPI64();
X
Xiaoyu Wang 已提交
838
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
839 840 841 842
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
X
Xiaoyu Wang 已提交
843
  SName name;
D
stmt  
dapan1121 已提交
844
  createSName(&name, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
X
Xiaoyu Wang 已提交
845 846
  char tbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(&name, tbFName);
H
refact  
Hongze Cheng 已提交
847
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
848 849 850 851
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
852

X
Xiaoyu Wang 已提交
853
  SToken sToken;
854 855
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
856
  CHECK_CODE(getSTableMeta(pCxt, &sToken));
857 858 859
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
X
Xiaoyu Wang 已提交
860
  CHECK_CODE(storeTableMeta(pCxt->pSubTableHashObj, tbFName, len, pCxt->pTableMeta));
861 862

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
863
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
864 865 866

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
867
  if (TK_NK_LP == sToken.type) {
868
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
869 870 871 872 873 874 875 876
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
877
  if (TK_NK_LP != sToken.type) {
878 879
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
880 881 882 883 884
  CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, &name));
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_RP != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
885 886 887 888

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
889
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow, char* tmpTokenBuf) {
890
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
891 892 893 894
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
895

H
refact  
Hongze Cheng 已提交
896 897
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
898
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
899
  SToken    sToken = {0};
900 901
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
902
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
903
    SSchema* pSchema = &schema[spd->boundColumns[i] - 1];
D
stmt  
dapan1121 已提交
904 905 906 907 908 909 910 911 912 913 914 915 916 917

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
    
918
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
919
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
920
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
921 922

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
923
      TSKEY tsKey = TD_ROW_KEY(row);
924
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
925 926 927 928
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
929
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
930
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
931
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
932 933 934
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
935 936 937
        }
      }
    }
D
stmt  
dapan1121 已提交
938 939

    *gotRow = true;
940 941
  }

C
Cary Xu 已提交
942
  // *len = pBuilder->extendedRowSize;
943 944 945 946
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
947
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
948
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
949
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
950
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
951 952

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
953
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
954 955
  SToken sToken;
  while (1) {
956 957
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
958
    if (TK_NK_LP != sToken.type) {
959 960
      break;
    }
961
    pCxt->pSql += index;
962 963 964 965 966 967 968 969

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
970 971 972 973 974
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
      pDataBlock->size += extendedRowSize; //len;
    }
975 976

    NEXT_TOKEN(pCxt->pSql, sToken);
977
    if (TK_NK_RP != sToken.type) {
978 979 980
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
981 982 983
    if (gotRow) {
      (*numOfRows)++;
    }
984 985
  }

D
stmt  
dapan1121 已提交
986
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
987 988 989 990 991
    return  buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
992
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
993 994 995 996
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
997
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
998

H
refact  
Hongze Cheng 已提交
999
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1000
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1001 1002 1003 1004 1005 1006 1007 1008
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1009
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1010 1011 1012 1013
  taosMemoryFreeClear(pReq->name);
  taosMemoryFreeClear(pReq->ctbCfg.pTag);
}

X
Xiaoyu Wang 已提交
1014
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1015
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1016 1017
  destroyBoundColumnInfo(&pCxt->tags);
  tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
X
Xiaoyu Wang 已提交
1018
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1019 1020
}

1021 1022 1023 1024 1025
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1026
  taosMemoryFreeClear(pDataBlock->pData);
1027 1028 1029
  if (!pDataBlock->cloned) {
    // free the refcount for metermeta
    if (pDataBlock->pTableMeta != NULL) {
wafwerar's avatar
wafwerar 已提交
1030
      taosMemoryFreeClear(pDataBlock->pTableMeta);
1031 1032 1033 1034
    }

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  }
wafwerar's avatar
wafwerar 已提交
1035
  taosMemoryFreeClear(pDataBlock);
1036 1037
}

X
Xiaoyu Wang 已提交
1038 1039 1040
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1041 1042

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1043 1044 1045
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1046 1047 1048 1049 1050 1051
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
D
stmt  
dapan1121 已提交
1052 1053
  int32_t tbNum = 0;
  
X
Xiaoyu Wang 已提交
1054
  // for each table
1055 1056
  while (1) {
    SToken sToken;
D
stmt  
dapan1121 已提交
1057 1058
    char *tbName = NULL;

1059 1060 1061 1062 1063
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
stmt  
dapan1121 已提交
1064
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1065
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1066 1067 1068 1069
      }
      break;
    }

D
stmt  
dapan1121 已提交
1070 1071 1072 1073
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");;
    }

D
stmt  
dapan1121 已提交
1074 1075
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
        
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
    
1087 1088 1089
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

H
refact  
Hongze Cheng 已提交
1090
    // USING cluase
1091 1092 1093 1094
    if (TK_USING == sToken.type) {
      CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
      NEXT_TOKEN(pCxt->pSql, sToken);
    } else {
1095
      CHECK_CODE(getTableMeta(pCxt, &tbnameToken));
1096 1097
    }

H
refact  
Hongze Cheng 已提交
1098
    STableDataBlocks* dataBuf = NULL;
1099
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1100 1101 1102
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));

1103
    if (TK_NK_LP == sToken.type) {
1104
      // pSql -> field1_name, ...)
1105
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
1106 1107 1108 1109 1110 1111
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1112
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1113 1114

      tbNum++;
1115 1116 1117 1118
      continue;
    }

    // FILE csv_file_path
1119
    if (TK_NK_FILE == sToken.type) {
1120 1121
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1122
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1123 1124 1125
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1126
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1127 1128

      tbNum++;
1129 1130 1131 1132 1133
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
D
stmt  
dapan1121 已提交
1134 1135
  
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
D
stmt  
dapan1121 已提交
1136 1137 1138 1139 1140
    SParsedDataColInfo *tags = taosMemoryMalloc(sizeof(pCxt->tags));
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1141
    (*pCxt->pStmtCb->setBindInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags);
D
stmt  
dapan1121 已提交
1142
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1143

D
stmt  
dapan1121 已提交
1144
    (*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1145 1146 1147 1148 1149 1150
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
    
    return TSDB_CODE_SUCCESS;
  }
  
1151
  // merge according to vgId
D
stmt  
dapan1121 已提交
1152
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1153
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1154
  }
1155
  return buildOutput(pCxt);
1156 1157 1158 1159 1160 1161 1162 1163
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
X
Xiaoyu Wang 已提交
1164
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
1165 1166
  SInsertParseContext context = {
    .pComCxt = pContext,
1167
    .pSql = (char*) pContext->pSql,
1168 1169
    .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
    .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1170
    .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
1171
    .totalNum = 0,
D
stmt  
dapan1121 已提交
1172 1173
    .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
    .pStmtCb = pContext->pStmtCb
1174 1175
  };

D
stmt  
dapan1121 已提交
1176 1177 1178
  if (pContext->pStmtCb && *pQuery) {
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, &context.pTableBlockHashObj);
  } else {
D
stmt  
dapan1121 已提交
1179 1180
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
    context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
D
stmt  
dapan1121 已提交
1181 1182
  }
  
X
Xiaoyu Wang 已提交
1183 1184
  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj ||
      NULL == context.pSubTableHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1185
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1186 1187
  }

D
stmt  
dapan1121 已提交
1188 1189 1190 1191
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1192
  if (NULL == *pQuery) {
D
stmt  
dapan1121 已提交
1193 1194 1195 1196 1197 1198 1199 1200
    *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
    (*pQuery)->haveResultSet = false;
    (*pQuery)->msgType = TDMT_VND_SUBMIT;
    (*pQuery)->pRoot = (SNode*)context.pOutput;
1201
  }
D
stmt  
dapan1121 已提交
1202
  
1203
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1204

1205 1206 1207 1208 1209
  int32_t code = skipInsertInto(&context);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1210
  return code;
1211
}
D
stmt  
dapan1121 已提交
1212 1213


D
stmt  
dapan1121 已提交
1214
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) {
D
stmt  
dapan1121 已提交
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
  SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen};
  SToken sToken;
  int32_t code = 0;
  char *tbName = NULL;
  
  NEXT_TOKEN(pTableName, sToken);
  
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1233
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}


int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
  SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
  int32_t code = 0;
  SInsertParseContext insertCtx = {
    .pVgroupsHashObj = pVgHash,
    .pTableBlockHashObj = pBlockHash,
D
stmt  
dapan1121 已提交
1247
    .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1248 1249 1250 1251
  };
  
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1252
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1253 1254 1255 1256 1257 1258 1259
  }

  CHECK_CODE(buildOutput(&insertCtx));

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen){
  STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; 
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  SKVRowBuilder tagBuilder;
  if (tdInitKVRowBuilder(&tagBuilder) < 0) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
  SKvParam param = {.builder = &tagBuilder};

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      KvRowAppend(&pBuf, NULL, 0, &param);
      continue;
    }
    
    SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1
    param.schema = pTagSchema;

    int32_t colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
    
    CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, &param));
  }

  SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
  if (NULL == row) {
    tdDestroyKVRowBuilder(&tagBuilder);
    return buildInvalidOperationMsg(&pBuf, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

  SVCreateTbReq tbReq = {0};
  CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid));
  CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));

  destroyCreateSubTbReq(&tbReq);
  tdDestroyKVRowBuilder(&tagBuilder);

  return TSDB_CODE_SUCCESS;
}


int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen) {
  STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
  SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
  SMemParam param = {.rb = pBuilder};
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; 
D
stmt  
dapan1121 已提交
1319 1320
  int32_t rowNum = bind->num;
  
D
stmt  
dapan1121 已提交
1321 1322
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1323 1324 1325 1326 1327 1328 1329 1330
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
    
    for (int c = 0; c < spd->numOfBound; ++c) {
      SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
D
stmt  
dapan1121 已提交
1331 1332 1333 1334 1335 1336 1337 1338

      if (bind[c].buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
D
stmt  
dapan1121 已提交
1339 1340 1341 1342 1343
      
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1344 1345 1346 1347
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
        
D
stmt  
dapan1121 已提交
1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
        
        CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
      }
    
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
        checkTimestamp(pDataBlock, (const char *)&tsKey);
      }
    }
    
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
    
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
1376

D
stmt  
dapan1121 已提交
1377 1378 1379 1380 1381 1382 1383 1384
  SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData);
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum) {
  STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
  SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
  SMemParam param = {.rb = pBuilder};
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; 
  bool rowStart = (0 == colIdx);
  bool rowEnd = ((colIdx + 1) == spd->numOfBound);

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
  
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
    
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx] - 1];
D
dapan1121 已提交
1410
    
D
stmt  
dapan1121 已提交
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
    
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
      
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
1425 1426 1427 1428
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
      
      CHECK_CODE(MemRowAppend(&pBuf, (char *)bind->buffer + bind->buffer_length * r, colLen, &param));
    }
  
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
      checkTimestamp(pDataBlock, (const char *)&tsKey);
    }
    
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }    
D
stmt  
dapan1121 已提交
1451 1452
  }

D
stmt  
dapan1121 已提交
1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

    SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData);
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}


int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) {
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
      SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1];
      strcpy((*fields)[i].name, pTagSchema->name);
      (*fields)[i].type = pTagSchema->type;
      (*fields)[i].bytes = pTagSchema->bytes;
    }
D
stmt  
dapan1121 已提交
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}


int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
  
  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);  
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
  
  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtColFields(void *pBlock, int32_t *fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
  SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);  
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
1512 1513 1514
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
  
  return TSDB_CODE_SUCCESS;
}


D
stmt  
dapan1121 已提交
1525