parInsert.c 38.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16 17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parUtil.h"
#include "parToken.h"
20 21 22 23 24 25 26 27 28 29 30
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

#define NEXT_TOKEN(pSql, sToken) \
  do { \
    int32_t index = 0; \
    sToken = tStrGetToken(pSql, &index, false); \
    pSql += index; \
  } while (0)

X
Xiaoyu Wang 已提交
31 32 33 34 35 36 37
#define NEXT_TOKEN_WITH_PREV(pSql, sToken) \
  do { \
    int32_t index = 0; \
    sToken = tStrGetToken(pSql, &index, true); \
    pSql += index; \
  } while (0)

38
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
39
  do { \
40
    sToken = tStrGetToken(pSql, &index, false); \
41 42
  } while (0)

43
#define CHECK_CODE(expr) \
44 45 46
  do { \
    int32_t code = expr; \
    if (TSDB_CODE_SUCCESS != code) { \
47
      return code; \
48 49 50 51
    } \
  } while (0)

typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
52
  SParseContext* pComCxt;       // input
53 54
  char          *pSql;          // input
  SMsgBuf        msg;           // input
X
Xiaoyu Wang 已提交
55 56 57
  STableMeta* pTableMeta;       // each table
  SParsedDataColInfo tags;      // each table
  SKVRowBuilder tagsBuilder;    // each table
X
Xiaoyu Wang 已提交
58
  SVCreateTbReq createTblReq;   // each table
X
Xiaoyu Wang 已提交
59 60
  SHashObj* pVgroupsHashObj;    // global
  SHashObj* pTableBlockHashObj; // global
X
Xiaoyu Wang 已提交
61
  SHashObj* pSubTableHashObj;   // global
X
Xiaoyu Wang 已提交
62 63
  SArray* pTableDataBlocks;     // global
  SArray* pVgDataBlocks;        // global
64
  int32_t totalNum;
X
Xiaoyu Wang 已提交
65
  SVnodeModifOpStmt* pOutput;
66 67
} SInsertParseContext;

X
Xiaoyu Wang 已提交
68
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param);
X
Xiaoyu Wang 已提交
69 70 71 72

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

73 74 75 76 77 78 79 80 81 82 83 84 85
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INSERT != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
  }
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INTO != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
  }
  return TSDB_CODE_SUCCESS;
}

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

148
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
149
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
150
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
151 152
  }

153 154
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
  if (NULL != p) { // db.table
H
Haojun Liao 已提交
155
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
156
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
157 158
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
159
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
160 161
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
162

163 164 165
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";

  int32_t  code = TSDB_CODE_SUCCESS;
  char* p  = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);

  if (p != NULL) { // db has been specified in sql string so we ignore current db path
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
    char name[TSDB_DB_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

    code = tNameSetDbName(pName, pParseCtx->acctId, name, dbLen);
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(tbname, p + 1, tbLen);
    /*tbLen = */strdequote(tbname);

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

    if (pParseCtx->db == NULL) {
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

    code = tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db));
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

D
stmt  
dapan1121 已提交
226
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) {
H
Haojun Liao 已提交
227
  SParseContext* pBasicCtx = pCxt->pComCxt;
228 229
  SName name = {0};
  createSName(&name, pTname, pBasicCtx, &pCxt->msg);  
D
stmt  
dapan1121 已提交
230 231 232 233 234
  if (isStb) {
    CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
  } else {
    CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
  }
X
Xiaoyu Wang 已提交
235
  SVgroupInfo vg;
H
Haojun Liao 已提交
236
  CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
X
Xiaoyu Wang 已提交
237
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
dapan1121 已提交
238
  
239 240 241
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
242 243 244 245 246 247 248 249 250
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
  return getTableMetaImpl(pCxt, pTname, false);
}

static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
  return getTableMetaImpl(pCxt, pTname, true);
}


251 252 253 254 255 256 257 258 259 260
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
261
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
S
Shengliang Guan 已提交
262
    SSubmitReq* submit = (SSubmitReq*)blocks->pData;
X
Xiaoyu Wang 已提交
263
    submit->header.vgId    = htonl(blocks->vg.vgId);
X
Xiaoyu Wang 已提交
264
    submit->header.contLen = htonl(blocks->size);
265
    submit->length         = submit->header.contLen;
X
Xiaoyu Wang 已提交
266
    submit->numOfBlocks    = htonl(blocks->numOfTables);
267
    SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
X
Xiaoyu Wang 已提交
268
    int32_t numOfBlocks = blocks->numOfTables;
269 270 271
    while (numOfBlocks--) {
      int32_t dataLen = blk->dataLen;
      blk->uid = htobe64(blk->uid);
X
Xiaoyu Wang 已提交
272
      blk->suid = htobe64(blk->suid);
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
      blk->padding = htonl(blk->padding);
      blk->sversion = htonl(blk->sversion);
      blk->dataLen = htonl(blk->dataLen);
      blk->schemaLen = htonl(blk->schemaLen);
      blk->numOfRows = htons(blk->numOfRows);
      blk = (SSubmitBlk*)(blk->data + dataLen);
    }
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
wafwerar's avatar
wafwerar 已提交
290
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
291 292 293
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
294
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
295 296
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
dengyihao's avatar
dengyihao 已提交
297
    TSWAP(dst->pData, src->pData, char*);
D
dapan1121 已提交
298
    buildMsgHeader(src, dst);
299 300 301 302 303
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

304 305 306 307 308 309 310
static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;
311
  if (k <= pDataBlocks->prevTS) {
312 313 314 315 316 317 318
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

319
static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time, SMsgBuf* pMsgBuf) {
320
  int32_t   index = 0;
321
  SToken    sToken;
322
  int64_t   interval;
323 324
  int64_t   ts = 0;
  char* pTokenEnd = *end;
325 326

  if (pToken->type == TK_NOW) {
327
    ts = taosGetTimestamp(timePrec);
328 329
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
330
  } else if (pToken->type == TK_NK_INTEGER) {
331 332 333
    bool isSigned = false;
    toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
  } else { // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
334
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
335
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
336 337 338 339 340 341 342
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
343 344 345 346 347
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') { //for insert NOW()/TODAY()
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
348
    if (pToken->z[k] == ',') {
349 350
      *end = pTokenEnd;
      *time = ts;
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
366
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
367 368 369 370 371
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
372
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
373 374 375 376 377 378 379
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

380
    if (sToken.type == TK_NK_PLUS) {
381
      ts += interval;
382
    } else {
383
      ts = ts - interval;
384 385
    }

386
    *end = pTokenEnd;
387 388
  }

389
  *time = ts;
390 391
  return TSDB_CODE_SUCCESS;
}
392

X
Xiaoyu Wang 已提交
393
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
394 395
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT &&
       pToken->type != TK_NK_BOOL && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) ||
396
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
397 398 399 400 401 402 403 404
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  if (IS_NUMERIC_TYPE(type) && pToken->n == 0) {
    return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
405
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

    // delete escape character: \\, \', \"
    char delim = pToken->z[0];
    int32_t cnt = 0;
    int32_t j = 0;
    for (uint32_t k = 1; k < pToken->n - 1; ++k) {
      if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
        tmpTokenBuf[j] = pToken->z[k + 1];
        cnt++;
        j++;
        k++;
        continue;
      }
      tmpTokenBuf[j] = pToken->z[k];
      j++;
    }

    tmpTokenBuf[j] = 0;
    pToken->z = tmpTokenBuf;
    pToken->n -= 2 + cnt;
  }

  return TSDB_CODE_SUCCESS;
}

static bool isNullStr(SToken *pToken) {
435
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
436 437 438 439 440 441 442 443 444
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) {
  errno = 0;
  *value = strtold(pToken->z, endPtr);

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
445
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
  }

  return pToken->type;
}

static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
  int64_t iv;
  char   *endptr = NULL;
  bool    isSigned = false;

  int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
      int64_t tmpVal = 0;
X
Xiaoyu Wang 已提交
464
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
465 466
    }

X
Xiaoyu Wang 已提交
467
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
468 469 470 471
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
472
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
473
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
474
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
475
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
476
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
477 478 479
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
480
      } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
481
        return func(pMsgBuf, ((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
482
      } else if (pToken->type == TK_NK_FLOAT) {
X
Xiaoyu Wang 已提交
483
        return func(pMsgBuf, ((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
X
Xiaoyu Wang 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
497
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
498 499 500 501 502 503 504 505 506
    }

    case TSDB_DATA_TYPE_UTINYINT:{
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
      } else if (!IS_VALID_UTINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
507
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
508 509 510 511 512 513 514 515 516
    }

    case TSDB_DATA_TYPE_SMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
517
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
518 519 520 521 522 523 524 525 526
    }

    case TSDB_DATA_TYPE_USMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
      } else if (!IS_VALID_USMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
      uint16_t tmpVal = (uint16_t)iv;
X
Xiaoyu Wang 已提交
527
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
528 529 530 531 532 533 534 535 536
    }

    case TSDB_DATA_TYPE_INT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
537
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
538 539 540 541 542 543 544 545 546
    }

    case TSDB_DATA_TYPE_UINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
      } else if (!IS_VALID_UINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
      uint32_t tmpVal = (uint32_t)iv;
X
Xiaoyu Wang 已提交
547
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
548 549 550 551 552 553 554 555
    }

    case TSDB_DATA_TYPE_BIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
556
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
557 558 559 560 561 562 563 564 565
    }

    case TSDB_DATA_TYPE_UBIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
      } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
      uint64_t tmpVal = (uint64_t)iv;
X
Xiaoyu Wang 已提交
566
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
567 568 569 570
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
571
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
572 573 574 575 576 577
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
578
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
579 580 581 582
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
583
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
584 585 586 587 588
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
589
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
590 591 592 593 594 595 596 597
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
        return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
      }

X
Xiaoyu Wang 已提交
598
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
599 600 601
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
602
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
603 604 605 606 607 608 609 610
    }

    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
611
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
612 613 614 615 616 617
    }
  }

  return TSDB_CODE_FAILED;
}

618
typedef struct SMemParam {
C
Cary Xu 已提交
619
  SRowBuilder* rb;
620 621 622
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
623 624
} SMemParam;

X
Xiaoyu Wang 已提交
625
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
626 627
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
628 629 630 631 632 633

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

634
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
635
    const char* rowEnd = tdRowEnd(rb->pBuf);
636
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
C
Cary Xu 已提交
637
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, true, pa->toffset, pa->colIdx);
638 639
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
640 641
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
642
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
643 644 645
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
646
    }
647
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
648
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
649
  } else {
650
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
651
  }
652

653 654
  return TSDB_CODE_SUCCESS;
}
655 656 657

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
658
  col_id_t nCols = pColList->numOfCols;
659 660

  pColList->numOfBound = 0; 
C
Cary Xu 已提交
661
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
662
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
663
  for (col_id_t i = 0; i < nCols; ++i) {
664 665 666 667 668
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  SToken sToken;
  bool    isOrdered = true;
C
Cary Xu 已提交
669
  col_id_t lastColIdx = -1;  // last column found
670 671 672
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

673
    if (TK_NK_RP == sToken.type) {
674 675 676
      break;
    }

C
Cary Xu 已提交
677 678
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
679 680 681 682 683 684 685 686 687 688 689 690
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z);
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
691
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
692
    ++pColList->numOfBound;
C
Cary Xu 已提交
693 694 695 696 697 698 699 700 701 702 703
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
704 705 706 707 708
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
709
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
710 711 712 713
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
714
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
715
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
716 717 718
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
719
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
720 721 722 723 724
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

725
  memset(&pColList->boundColumns[pColList->numOfBound], 0,
C
Cary Xu 已提交
726
         sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
727 728 729 730

  return TSDB_CODE_SUCCESS;
}

731 732 733 734 735 736
typedef struct SKvParam {
  SKVRowBuilder *builder;
  SSchema       *schema;
  char           buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

X
Xiaoyu Wang 已提交
737
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) {
738 739
  SKvParam* pa = (SKvParam*) param;

740 741
  int8_t  type = pa->schema->type;
  int16_t colId = pa->schema->colId;
742 743 744 745 746 747 748

  if (TSDB_DATA_TYPE_BINARY == type) {
    STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
    tdAddColToKVRow(pa->builder, colId, type, pa->buf);
  } else if (TSDB_DATA_TYPE_NCHAR == type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
    int32_t output = 0;
wafwerar's avatar
wafwerar 已提交
749
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
750 751 752
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);;
753 754 755 756 757 758 759 760 761 762 763
    }

    varDataSetLen(pa->buf, output);
    tdAddColToKVRow(pa->builder, colId, type, pa->buf);
  } else {
    tdAddColToKVRow(pa->builder, colId, type, value);
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
764 765 766 767 768 769 770 771 772 773 774 775
static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, SKVRow row) {
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pName, dbFName);
  pCxt->createTblReq.type = TD_CHILD_TABLE;
  pCxt->createTblReq.dbFName = strdup(dbFName);
  pCxt->createTblReq.name = strdup(pName->tname);
  pCxt->createTblReq.ctbCfg.suid = pCxt->pTableMeta->suid;
  pCxt->createTblReq.ctbCfg.pTag = row;

  return TSDB_CODE_SUCCESS;
}

776
// pSql -> tag1_value, ...)
X
Xiaoyu Wang 已提交
777
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const SName* pName) {
778
  if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
779 780 781
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

782
  SKvParam param = {.builder = &pCxt->tagsBuilder};
783
  SToken sToken;
784
  char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
785
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
786
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
X
Xiaoyu Wang 已提交
787 788 789
    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
    param.schema = pTagSchema;
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
790 791
  }

792
  SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
793 794 795 796 797
  if (NULL == row) {
    return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

X
Xiaoyu Wang 已提交
798 799
  return buildCreateTbReq(pCxt, pName, row);
}
800

X
Xiaoyu Wang 已提交
801 802 803 804 805 806 807 808
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
809

X
Xiaoyu Wang 已提交
810 811 812 813 814
static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, STableMeta* pMeta) {
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
815
  pBackup->uid = tGenIdPI64();
X
Xiaoyu Wang 已提交
816
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
817 818 819 820
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
X
Xiaoyu Wang 已提交
821 822 823 824 825 826 827 828 829
  SName name;
  createSName(&name, pTbnameToken, pCxt->pComCxt, &pCxt->msg);
  char tbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(&name, tbFName);
  int32_t len = strlen(tbFName);
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
830

X
Xiaoyu Wang 已提交
831
  SToken sToken;
832 833
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
834
  CHECK_CODE(getSTableMeta(pCxt, &sToken));
835 836 837
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
X
Xiaoyu Wang 已提交
838
  CHECK_CODE(storeTableMeta(pCxt->pSubTableHashObj, tbFName, len, pCxt->pTableMeta));
839 840

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
841
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
842 843 844

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
845
  if (TK_NK_LP == sToken.type) {
846
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
847 848 849 850 851 852 853 854
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
855
  if (TK_NK_LP != sToken.type) {
856 857
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
858 859 860 861 862
  CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, &name));
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_RP != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
863 864 865 866

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
867
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) {
868
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
869 870 871 872
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
873

874 875
  bool isParseBindParam = false;
  SSchema* schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
876
  SMemParam param = {.rb = pBuilder};
877
  SToken sToken = {0};
878 879
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
880
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
881
    SSchema* pSchema = &schema[spd->boundColumns[i] - 1];
882
    param.schema = pSchema;
883
    getSTSRowAppendInfo(schema, pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
884
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
885 886

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
887
      TSKEY tsKey = TD_ROW_KEY(row);
888 889 890 891 892 893 894 895
      if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) {
        buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z);
        return TSDB_CODE_TSC_INVALID_TIME_STAMP;
      }
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
896
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
897
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
898
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
899 900 901
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
902 903 904 905 906
        }
      }
    }
  }

C
Cary Xu 已提交
907
  // *len = pBuilder->extendedRowSize;
908 909 910 911
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
912
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
913 914
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
  int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
915
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
916 917

  (*numOfRows) = 0;
918
  char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
919 920
  SToken sToken;
  while (1) {
921 922
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
923
    if (TK_NK_LP != sToken.type) {
924 925
      break;
    }
926
    pCxt->pSql += index;
927 928 929 930 931 932 933 934 935 936

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

    int32_t len = 0;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &len, tmpTokenBuf));
C
Cary Xu 已提交
937
    pDataBlock->size += extendedRowSize; //len;
938 939

    NEXT_TOKEN(pCxt->pSql, sToken);
940
    if (TK_NK_RP != sToken.type) {
941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

    (*numOfRows)++;
  }

  if (0 == (*numOfRows)) {
    return  buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {  
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
958
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
959 960

  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
D
dapan1121 已提交
961
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
962 963 964 965 966 967 968 969
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
970 971 972 973 974 975
static void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
  taosMemoryFreeClear(pReq->dbFName);
  taosMemoryFreeClear(pReq->name);
  taosMemoryFreeClear(pReq->ctbCfg.pTag);
}

X
Xiaoyu Wang 已提交
976
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
977
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
978 979
  destroyBoundColumnInfo(&pCxt->tags);
  tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
X
Xiaoyu Wang 已提交
980
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
981 982
}

983 984 985 986 987
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
988
  taosMemoryFreeClear(pDataBlock->pData);
989 990 991
  if (!pDataBlock->cloned) {
    // free the refcount for metermeta
    if (pDataBlock->pTableMeta != NULL) {
wafwerar's avatar
wafwerar 已提交
992
      taosMemoryFreeClear(pDataBlock->pTableMeta);
993 994 995 996
    }

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  }
wafwerar's avatar
wafwerar 已提交
997
  taosMemoryFreeClear(pDataBlock);
998 999
}

X
Xiaoyu Wang 已提交
1000 1001 1002
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1003 1004

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1005 1006 1007 1008
  destroyBlockArrayList(pCxt->pTableDataBlocks);
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1009 1010 1011 1012 1013 1014
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
X
Xiaoyu Wang 已提交
1015
  // for each table
1016
  while (1) {
X
Xiaoyu Wang 已提交
1017 1018
    destroyInsertParseContextForTable(pCxt);

1019 1020 1021 1022 1023 1024 1025
    SToken sToken;
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      if (0 == pCxt->totalNum) {
1026
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");;
1027 1028 1029 1030 1031 1032 1033
      }
      break;
    }

    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

1034
    // USING cluase 
1035 1036 1037 1038
    if (TK_USING == sToken.type) {
      CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
      NEXT_TOKEN(pCxt->pSql, sToken);
    } else {
1039
      CHECK_CODE(getTableMeta(pCxt, &tbnameToken));
1040 1041 1042 1043
    }

    STableDataBlocks *dataBuf = NULL;
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
X
Xiaoyu Wang 已提交
1044
        sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq));
X
Xiaoyu Wang 已提交
1045
        
1046
    if (TK_NK_LP == sToken.type) {
1047
      // pSql -> field1_name, ...)
1048
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
1049 1050 1051 1052 1053 1054
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
1055
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_INSERT;
1056 1057 1058 1059
      continue;
    }

    // FILE csv_file_path
1060
    if (TK_NK_FILE == sToken.type) {
1061 1062
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1063
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1064 1065 1066
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1067
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
1068 1069 1070 1071 1072 1073 1074
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
  // merge according to vgId
  if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1075
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1076
  }
1077
  return buildOutput(pCxt);
1078 1079 1080 1081 1082 1083 1084 1085
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
X
Xiaoyu Wang 已提交
1086
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
1087 1088
  SInsertParseContext context = {
    .pComCxt = pContext,
1089
    .pSql = (char*) pContext->pSql,
1090 1091
    .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
    .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1092
    .pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
1093
    .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
X
Xiaoyu Wang 已提交
1094
    .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
1095
    .totalNum = 0,
X
Xiaoyu Wang 已提交
1096
    .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)
1097 1098
  };

X
Xiaoyu Wang 已提交
1099 1100
  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj ||
      NULL == context.pSubTableHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1101
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1102 1103
  }

wafwerar's avatar
wafwerar 已提交
1104
  *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
1105 1106 1107
  if (NULL == *pQuery) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1108
  (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
1109 1110
  (*pQuery)->haveResultSet = false;
  (*pQuery)->msgType = TDMT_VND_SUBMIT;
X
Xiaoyu Wang 已提交
1111
  (*pQuery)->pRoot = (SNode*)context.pOutput;
1112
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1113

1114 1115 1116 1117 1118
  int32_t code = skipInsertInto(&context);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1119
  return code;
1120
}