parInsert.c 82.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43
  } while (0)

44 45 46 47 48 49 50
#define NEXT_VALID_TOKEN(pSql, sToken)        \
  do {                                        \
    sToken.n = tGetToken(pSql, &sToken.type); \
    sToken.z = pSql;                          \
    pSql += sToken.n;                         \
  } while (TK_NK_SPACE == sToken.type)

51
typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
52 53 54 55 56 57 58 59 60 61
  SParseContext*     pComCxt;             // input
  char*              pSql;                // input
  SMsgBuf            msg;                 // input
  STableMeta*        pTableMeta;          // each table
  SParsedDataColInfo tags;                // each table
  SVCreateTbReq      createTblReq;        // each table
  SHashObj*          pVgroupsHashObj;     // global
  SHashObj*          pTableBlockHashObj;  // global
  SHashObj*          pSubTableHashObj;    // global
  SArray*            pVgDataBlocks;       // global
X
Xiaoyu Wang 已提交
62
  SHashObj*          pTableNameHashObj;   // global
D
dapan1121 已提交
63
  SHashObj*          pDbFNameHashObj;     // global
X
Xiaoyu Wang 已提交
64
  int32_t            totalNum;
X
Xiaoyu Wang 已提交
65
  SVnodeModifOpStmt* pOutput;
X
Xiaoyu Wang 已提交
66
  SStmtCallback*     pStmtCb;
67
  SParseMetaCache*   pMetaCache;
68 69
} SInsertParseContext;

H
refact  
Hongze Cheng 已提交
70
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
71 72 73 74

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
75
typedef struct SKvParam {
C
Cary Xu 已提交
76
  int16_t  pos;
C
Cary Xu 已提交
77
  SArray*  pTagVals;
C
Cary Xu 已提交
78 79
  SSchema* schema;
  char     buf[TSDB_MAX_TAGS_LEN];
D
stmt  
dapan1121 已提交
80 81 82 83 84 85 86 87 88
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

X
Xiaoyu Wang 已提交
89 90 91
#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
D
stmt  
dapan1121 已提交
92
    if (TSDB_CODE_SUCCESS != code) { \
X
Xiaoyu Wang 已提交
93 94
      return code;                   \
    }                                \
D
stmt  
dapan1121 已提交
95 96
  } while (0)

97
static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
98
  SToken sToken;
99
  NEXT_TOKEN(*pSql, sToken);
100
  if (TK_INSERT != sToken.type) {
101
    return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
102
  }
103
  NEXT_TOKEN(*pSql, sToken);
104
  if (TK_INTO != sToken.type) {
105
    return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", sToken.z);
106 107 108 109
  }
  return TSDB_CODE_SUCCESS;
}

110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

172
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
173
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
174
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
175 176
  }

177
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
178
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
179
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
180
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
181 182
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
183
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
184 185
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
186

187 188 189
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
190
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
191 192 193
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";
194
  const char* msg4 = "invalid table name";
X
Xiaoyu Wang 已提交
195

H
refact  
Hongze Cheng 已提交
196 197
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
198

H
refact  
Hongze Cheng 已提交
199
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
200 201 202
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
203
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
204 205 206
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
207
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
208 209 210 211 212
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
213 214 215 216
    if (tbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg4);
    }

217
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
218
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
219
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
236
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
237 238 239
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
240
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

255 256
static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
X
Xiaoyu Wang 已提交
257
  if (pBasicCtx->async) {
258 259
    return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
  }
D
dapan1121 已提交
260 261 262 263 264 265
  SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, 
                           .requestId = pBasicCtx->requestId,
                           .requestObjRefId = pBasicCtx->requestRid,
                           .mgmtEps = pBasicCtx->mgmtEpSet};

  return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
266 267 268 269
}

static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
X
Xiaoyu Wang 已提交
270
  if (pBasicCtx->async) {
271 272
    return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
  }
D
dapan1121 已提交
273 274 275 276 277
  SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, 
                           .requestId = pBasicCtx->requestId,
                           .requestObjRefId = pBasicCtx->requestRid,
                           .mgmtEps = pBasicCtx->mgmtEpSet};
  
278
  if (isStb) {
D
dapan1121 已提交
279
    return catalogGetSTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
280
  }
D
dapan1121 已提交
281
  return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
282 283 284
}

static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
H
Haojun Liao 已提交
285
  SParseContext* pBasicCtx = pCxt->pComCxt;
X
Xiaoyu Wang 已提交
286
  if (pBasicCtx->async) {
287 288
    return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
  }
D
dapan1121 已提交
289 290 291 292 293
  SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, 
                           .requestId = pBasicCtx->requestId,
                           .requestObjRefId = pBasicCtx->requestRid,
                           .mgmtEps = pBasicCtx->mgmtEpSet};
  return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg);
294
}
D
dapan 已提交
295

296
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
D
dapan 已提交
297
  bool pass = false;
298
  CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
D
dapan 已提交
299 300 301
  if (!pass) {
    return TSDB_CODE_PAR_PERMISSION_DENIED;
  }
302 303 304

  CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
  if (!isStb) {
X
Xiaoyu Wang 已提交
305
    SVgroupInfo vg;
306
    CHECK_CODE(getTableVgroup(pCxt, name, &vg));
X
Xiaoyu Wang 已提交
307
    CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
stmt  
dapan1121 已提交
308
  }
H
refact  
Hongze Cheng 已提交
309
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
310 311
}

X
Xiaoyu Wang 已提交
312 313 314
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, false);
}
D
stmt  
dapan1121 已提交
315

X
Xiaoyu Wang 已提交
316 317 318
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, true);
}
D
stmt  
dapan1121 已提交
319

X
Xiaoyu Wang 已提交
320 321 322 323 324
static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
  if (pBasicCtx->async) {
    CHECK_CODE(getDbCfgFromCache(pCxt->pMetaCache, pDbFName, pInfo));
  } else {
D
dapan1121 已提交
325 326 327 328 329
    SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, 
                             .requestId = pBasicCtx->requestId,
                             .requestObjRefId = pBasicCtx->requestRid,
                             .mgmtEps = pBasicCtx->mgmtEpSet};
    CHECK_CODE(catalogGetDBCfg(pBasicCtx->pCatalog, &conn, pDbFName, pInfo));
X
Xiaoyu Wang 已提交
330 331 332 333
  }
  return TSDB_CODE_SUCCESS;
}

334 335 336 337 338 339 340 341 342 343
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
344
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
345 346 347 348 349 350 351 352 353
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
354
    int32_t schemaLen = blk->schemaLen;
H
refact  
Hongze Cheng 已提交
355 356 357 358 359 360 361
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
362
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
H
refact  
Hongze Cheng 已提交
363
  }
364 365 366 367 368 369 370 371 372 373
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
374
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
375 376 377
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
378
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
379 380
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
wafwerar's avatar
wafwerar 已提交
381
    TSWAP(dst->pData, src->pData);
D
dapan1121 已提交
382
    buildMsgHeader(src, dst);
383 384 385 386 387
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
388
int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
389 390 391 392 393
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
394
  TSKEY k = *(TSKEY*)start;
395
  if (k <= pDataBlocks->prevTS) {
396 397 398 399 400 401 402
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
403 404 405 406 407 408
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
409 410

  if (pToken->type == TK_NOW) {
411
    ts = taosGetTimestamp(timePrec);
412 413
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
414
  } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
415
    toInteger(pToken->z, pToken->n, 10, &ts);
H
refact  
Hongze Cheng 已提交
416
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
417
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
418
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
419 420 421 422 423 424 425
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
426
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
427 428 429 430
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
431
    if (pToken->z[k] == ',') {
432 433
      *end = pTokenEnd;
      *time = ts;
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
449
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
450 451 452 453 454
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
455
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
456 457 458 459 460 461 462
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

463
    if (sToken.type == TK_NK_PLUS) {
464
      ts += interval;
465
    } else {
466
      ts = ts - interval;
467 468
    }

469
    *end = pTokenEnd;
470 471
  }

472
  *time = ts;
473 474
  return TSDB_CODE_SUCCESS;
}
475

wmmhello's avatar
wmmhello 已提交
476
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
477 478 479 480
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
481
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
482 483 484 485
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
486
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
487 488 489 490
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

491
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
492
    pToken->z = tmpTokenBuf;
493
    pToken->n = len;
X
Xiaoyu Wang 已提交
494 495 496 497 498
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
499
static bool isNullStr(SToken* pToken) {
500
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
501 502 503
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
504
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
505
  errno = 0;
wafwerar's avatar
wafwerar 已提交
506
  *value = taosStr2Double(pToken->z, endPtr);
X
Xiaoyu Wang 已提交
507 508 509

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
510
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
511 512 513 514 515
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
516 517
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
518 519 520
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;
X
Xiaoyu Wang 已提交
521

wmmhello's avatar
wmmhello 已提交
522
  int32_t code = checkAndTrimValue(pToken, tmpTokenBuf, pMsgBuf);
X
Xiaoyu Wang 已提交
523 524 525 526 527 528
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
529
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
530 531
    }

X
Xiaoyu Wang 已提交
532
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
533 534 535 536
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
537
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
538
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
539
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
540
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
541
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
542 543 544
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
545
      } else if (pToken->type == TK_NK_INTEGER) {
546 547
        return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
548
      } else if (pToken->type == TK_NK_FLOAT) {
549 550
        return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
X
Xiaoyu Wang 已提交
551 552 553 554 555 556
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
X
Xiaoyu Wang 已提交
557
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
558 559 560 561 562 563
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
564
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
565 566
    }

H
refact  
Hongze Cheng 已提交
567
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
568
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
569
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
X
Xiaoyu Wang 已提交
570
      } else if (!IS_VALID_UTINYINT(uv)) {
X
Xiaoyu Wang 已提交
571 572
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
573
      uint8_t tmpVal = (uint8_t)uv;
X
Xiaoyu Wang 已提交
574
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
575 576 577
    }

    case TSDB_DATA_TYPE_SMALLINT: {
X
Xiaoyu Wang 已提交
578
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
579 580 581 582 583
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
584
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
585 586 587
    }

    case TSDB_DATA_TYPE_USMALLINT: {
X
Xiaoyu Wang 已提交
588
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
589
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
X
Xiaoyu Wang 已提交
590
      } else if (!IS_VALID_USMALLINT(uv)) {
X
Xiaoyu Wang 已提交
591 592
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
593
      uint16_t tmpVal = (uint16_t)uv;
X
Xiaoyu Wang 已提交
594
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
595 596 597
    }

    case TSDB_DATA_TYPE_INT: {
X
Xiaoyu Wang 已提交
598
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
599 600 601 602 603
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
604
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
605 606 607
    }

    case TSDB_DATA_TYPE_UINT: {
X
Xiaoyu Wang 已提交
608
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
609
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
X
Xiaoyu Wang 已提交
610
      } else if (!IS_VALID_UINT(uv)) {
X
Xiaoyu Wang 已提交
611 612
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
613
      uint32_t tmpVal = (uint32_t)uv;
X
Xiaoyu Wang 已提交
614
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
615 616 617
    }

    case TSDB_DATA_TYPE_BIGINT: {
X
Xiaoyu Wang 已提交
618
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
619 620 621 622
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
623
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
624 625 626
    }

    case TSDB_DATA_TYPE_UBIGINT: {
X
Xiaoyu Wang 已提交
627
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
628
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
X
Xiaoyu Wang 已提交
629
      } else if (!IS_VALID_UBIGINT(uv)) {
X
Xiaoyu Wang 已提交
630 631
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
632
      return func(pMsgBuf, &uv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
633 634 635 636
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
637
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
638 639
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
640 641
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
642 643 644
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
645
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
646 647 648 649
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
650
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
651 652 653 654 655
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
656
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
657 658 659 660 661
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
D
dapan1121 已提交
662
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
X
Xiaoyu Wang 已提交
663 664
      }

X
Xiaoyu Wang 已提交
665
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
666 667 668
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
669
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
670
    }
671
    case TSDB_DATA_TYPE_JSON: {
X
Xiaoyu Wang 已提交
672
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
673 674 675 676
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }
X
Xiaoyu Wang 已提交
677 678 679 680 681 682
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
683
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
684 685 686 687 688 689
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
690
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
691 692
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
693 694 695 696 697 698

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

699
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
700
    const char* rowEnd = tdRowEnd(rb->pBuf);
701
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
702
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
703 704
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
705 706
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
707
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
D
dapan1121 已提交
708 709 710
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
X
Xiaoyu Wang 已提交
711 712 713
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
714
    }
715
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
716
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
717
  } else {
718
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
719
  }
720

721 722
  return TSDB_CODE_SUCCESS;
}
723 724 725

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
726
  col_id_t nCols = pColList->numOfCols;
727

H
refact  
Hongze Cheng 已提交
728
  pColList->numOfBound = 0;
C
Cary Xu 已提交
729
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
730
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
731
  for (col_id_t i = 0; i < nCols; ++i) {
732 733 734
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
735 736
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
737
  col_id_t lastColIdx = -1;  // last column found
738 739 740
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

741
    if (TK_NK_RP == sToken.type) {
742 743 744
      break;
    }

C
Cary Xu 已提交
745 746
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
747 748 749 750 751
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
752
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
753 754 755 756 757 758
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
759
    pColList->boundColumns[pColList->numOfBound] = index;
760
    ++pColList->numOfBound;
C
Cary Xu 已提交
761 762 763 764 765 766 767 768 769 770 771
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
772 773 774 775 776
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
777
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
778 779 780 781
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
782
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
783
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
784 785 786
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
787
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
788 789 790 791 792
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
793
  if (pColList->numOfCols > pColList->numOfBound) {
794 795 796
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
797 798 799 800

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
801 802 803 804 805
static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) {
  pTbReq->type = TD_CHILD_TABLE;
  pTbReq->name = strdup(tname);
  pTbReq->ctb.suid = suid;
  pTbReq->ctb.pTag = (uint8_t*)pTag;
806

wmmhello's avatar
wmmhello 已提交
807 808
  return;
}
809

810 811
static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, STagVal* val,
                             SMsgBuf* pMsgBuf) {
wmmhello's avatar
wmmhello 已提交
812 813 814 815 816 817 818 819
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
    }
820 821 822 823

    return TSDB_CODE_SUCCESS;
  }

wmmhello's avatar
wmmhello 已提交
824
  val->cid = pSchema->colId;
wmmhello's avatar
wmmhello 已提交
825
  val->type = pSchema->type;
C
Cary Xu 已提交
826

wmmhello's avatar
wmmhello 已提交
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
          *(int8_t*)(&val->i64) = TRUE_VALUE;
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
          *(int8_t*)(&val->i64) = FALSE_VALUE;
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
      } else if (pToken->type == TK_NK_INTEGER) {
        *(int8_t*)(&val->i64) = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
      } else if (pToken->type == TK_NK_FLOAT) {
        *(int8_t*)(&val->i64) = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
D
dapan1121 已提交
843
      }
wmmhello's avatar
wmmhello 已提交
844 845
      break;
    }
846

wmmhello's avatar
wmmhello 已提交
847 848 849 850 851
    case TSDB_DATA_TYPE_TINYINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
D
dapan1121 已提交
852
      }
wmmhello's avatar
wmmhello 已提交
853 854 855

      *(int8_t*)(&val->i64) = iv;
      break;
856 857
    }

wmmhello's avatar
wmmhello 已提交
858 859 860 861 862 863 864 865 866
    case TSDB_DATA_TYPE_UTINYINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
      } else if (!IS_VALID_UTINYINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
      *(uint8_t*)(&val->i64) = uv;
      break;
    }
867

wmmhello's avatar
wmmhello 已提交
868 869 870 871 872 873 874 875 876
    case TSDB_DATA_TYPE_SMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      *(int16_t*)(&val->i64) = iv;
      break;
    }
877

wmmhello's avatar
wmmhello 已提交
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966
    case TSDB_DATA_TYPE_USMALLINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
      } else if (!IS_VALID_USMALLINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
      *(uint16_t*)(&val->i64) = uv;
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      *(int32_t*)(&val->i64) = iv;
      break;
    }

    case TSDB_DATA_TYPE_UINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
      } else if (!IS_VALID_UINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
      *(uint32_t*)(&val->i64) = uv;
      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }

      val->i64 = iv;
      break;
    }

    case TSDB_DATA_TYPE_UBIGINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
      } else if (!IS_VALID_UBIGINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
      *(uint64_t*)(&val->i64) = uv;
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      *(float*)(&val->i64) = dv;
      break;
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }

      *(double*)(&val->i64) = dv;
      break;
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
      }
      val->pData = pToken->z;
      val->nData = pToken->n;
      break;
    }

    case TSDB_DATA_TYPE_NCHAR: {
      int32_t output = 0;
967 968
      void*   p = taosMemoryCalloc(1, pToken->n * TSDB_NCHAR_SIZE);
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
969 970
        return TSDB_CODE_OUT_OF_MEMORY;
      }
971
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)(p), pToken->n * TSDB_NCHAR_SIZE, &output)) {
wmmhello's avatar
wmmhello 已提交
972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993
        if (errno == E2BIG) {
          taosMemoryFree(p);
          return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
        }
        char buf[512] = {0};
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
        taosMemoryFree(p);
        return buildSyntaxErrMsg(pMsgBuf, buf, pToken->z);
      }
      val->pData = p;
      val->nData = output;
      break;
    }
    case TSDB_DATA_TYPE_TIMESTAMP: {
      if (parseTime(end, pToken, timePrec, &iv, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

      val->i64 = iv;
      break;
    }
  }
X
Xiaoyu Wang 已提交
994 995 996 997

  return TSDB_CODE_SUCCESS;
}

998
// pSql -> tag1_value, ...)
wmmhello's avatar
wmmhello 已提交
999
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
wmmhello's avatar
wmmhello 已提交
1000
  int32_t code = TSDB_CODE_SUCCESS;
1001 1002 1003 1004 1005
  SArray* pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal));
  SToken  sToken;
  bool    isParseBindParam = false;
  bool    isJson = false;
  STag*   pTag = NULL;
1006
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1007
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
1008 1009 1010 1011

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
wmmhello's avatar
wmmhello 已提交
1012 1013
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
        goto end;
D
stmt  
dapan1121 已提交
1014 1015 1016 1017 1018 1019
      }

      continue;
    }

    if (isParseBindParam) {
wmmhello's avatar
wmmhello 已提交
1020 1021
      code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
      goto end;
D
stmt  
dapan1121 已提交
1022
    }
X
Xiaoyu Wang 已提交
1023

X
Xiaoyu Wang 已提交
1024
    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
X
Xiaoyu Wang 已提交
1025
    char*    tmpTokenBuf = taosMemoryCalloc(1, sToken.n);  // todo this can be optimize with parse column
wmmhello's avatar
wmmhello 已提交
1026 1027 1028 1029 1030
    code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg);
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(tmpTokenBuf);
      goto end;
    }
1031
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
1032 1033 1034 1035 1036
      if (sToken.n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        code = buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", sToken.z);
        taosMemoryFree(tmpTokenBuf);
        goto end;
      }
X
Xiaoyu Wang 已提交
1037
      if (isNullStr(&sToken)) {
1038 1039 1040 1041
        code = tTagNew(pTagVals, 1, true, &pTag);
      } else {
        code = parseJsontoTagData(sToken.z, pTagVals, &pTag, &pCxt->msg);
      }
wmmhello's avatar
wmmhello 已提交
1042
      taosMemoryFree(tmpTokenBuf);
1043
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1044 1045
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
1046
      isJson = true;
1047
    } else {
wmmhello's avatar
wmmhello 已提交
1048
      STagVal val = {0};
wmmhello's avatar
wmmhello 已提交
1049
      code = parseTagToken(&pCxt->pSql, &sToken, pTagSchema, precision, &val, &pCxt->msg);
wmmhello's avatar
wmmhello 已提交
1050 1051 1052 1053
      if (TSDB_CODE_SUCCESS != code) {
        taosMemoryFree(tmpTokenBuf);
        goto end;
      }
1054
      if (pTagSchema->type != TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1055 1056 1057 1058
        taosMemoryFree(tmpTokenBuf);
      }
      taosArrayPush(pTagVals, &val);
    }
1059 1060
  }

D
stmt  
dapan1121 已提交
1061
  if (isParseBindParam) {
wmmhello's avatar
wmmhello 已提交
1062 1063
    code = TSDB_CODE_SUCCESS;
    goto end;
D
stmt  
dapan1121 已提交
1064 1065
  }

1066
  if (!isJson && (code = tTagNew(pTagVals, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1067
    goto end;
1068 1069
  }

wmmhello's avatar
wmmhello 已提交
1070 1071 1072 1073
  buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid);

end:
  for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
1074 1075
    STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
    if (IS_VAR_DATA_TYPE(p->type)) {
wmmhello's avatar
wmmhello 已提交
1076 1077 1078 1079 1080
      taosMemoryFree(p->pData);
    }
  }
  taosArrayDestroy(pTagVals);
  return code;
X
Xiaoyu Wang 已提交
1081
}
1082

X
Xiaoyu Wang 已提交
1083 1084 1085 1086 1087 1088 1089 1090
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
1091

X
Xiaoyu Wang 已提交
1092 1093
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
                              int32_t len, STableMeta* pMeta) {
1094 1095
  SVgroupInfo vg;
  CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
X
Xiaoyu Wang 已提交
1096 1097
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));

D
dapan 已提交
1098
  pMeta->uid = 0;
X
Xiaoyu Wang 已提交
1099
  pMeta->vgId = vg.vgId;
D
dapan 已提交
1100
  pMeta->tableType = TSDB_CHILD_TABLE;
X
Xiaoyu Wang 已提交
1101

X
Xiaoyu Wang 已提交
1102 1103 1104 1105 1106
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
1107 1108 1109
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
D
dapan 已提交
1110
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
H
refact  
Hongze Cheng 已提交
1111
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
1112 1113 1114 1115
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
1116

X
Xiaoyu Wang 已提交
1117
  SToken sToken;
1118 1119
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1120 1121 1122

  SName sname;
  createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
X
Xiaoyu Wang 已提交
1123 1124
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(&sname, dbFName);
X
Xiaoyu Wang 已提交
1125

X
Xiaoyu Wang 已提交
1126
  CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName));
1127 1128 1129
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
D
dapan 已提交
1130
  CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
1131 1132

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
1133
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
1134 1135 1136

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
1137
  if (TK_NK_LP == sToken.type) {
1138
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
1139 1140 1141 1142 1143 1144 1145 1146
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
1147
  if (TK_NK_LP != sToken.type) {
1148 1149
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1150
  CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
1151 1152 1153 1154
  NEXT_VALID_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_COMMA == sToken.type) {
    return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
  } else if (TK_NK_RP != sToken.type) {
X
Xiaoyu Wang 已提交
1155 1156
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
1157 1158 1159 1160

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1161 1162
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow,
                       char* tmpTokenBuf) {
1163
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
1164 1165 1166 1167
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
1168

H
refact  
Hongze Cheng 已提交
1169 1170
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
1171
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
1172
  SToken    sToken = {0};
1173 1174
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1175
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
X
Xiaoyu Wang 已提交
1176
    SSchema* pSchema = &schema[spd->boundColumns[i]];
D
stmt  
dapan1121 已提交
1177 1178 1179 1180 1181 1182 1183 1184 1185 1186

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

D
dapan1121 已提交
1187 1188 1189 1190
    if (TK_NK_RP == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    }

D
stmt  
dapan1121 已提交
1191 1192 1193
    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
X
Xiaoyu Wang 已提交
1194

1195
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
1196
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
1197
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
1198 1199

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
1200
      TSKEY tsKey = TD_ROW_KEY(row);
1201
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
1202 1203 1204 1205
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
1206
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
1207
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
1208
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
1209 1210 1211
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
1212 1213 1214
        }
      }
    }
D
stmt  
dapan1121 已提交
1215 1216

    *gotRow = true;
C
Cary Xu 已提交
1217 1218
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
C
Cary Xu 已提交
1219
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1220 1221
    taosMemoryFree(pSTSchema);
#endif
1222 1223
  }

C
Cary Xu 已提交
1224
  // *len = pBuilder->extendedRowSize;
1225 1226 1227 1228
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
1229
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
1230
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
1231
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
1232
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
1233 1234

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
1235
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
1236 1237
  SToken sToken;
  while (1) {
1238 1239
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
1240
    if (TK_NK_LP != sToken.type) {
1241 1242
      break;
    }
1243
    pCxt->pSql += index;
1244 1245 1246 1247 1248 1249 1250 1251

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
1252 1253 1254
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
X
Xiaoyu Wang 已提交
1255
      pDataBlock->size += extendedRowSize;  // len;
D
stmt  
dapan1121 已提交
1256
    }
1257

1258 1259 1260 1261
    NEXT_VALID_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_COMMA == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    } else if (TK_NK_RP != sToken.type) {
1262 1263 1264
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
1265 1266 1267
    if (gotRow) {
      (*numOfRows)++;
    }
1268 1269
  }

D
stmt  
dapan1121 已提交
1270
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
X
Xiaoyu Wang 已提交
1271
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
1272 1273 1274 1275
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
1276
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
1277 1278 1279 1280
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
1281
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
1282

H
refact  
Hongze Cheng 已提交
1283
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1284
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1285 1286 1287 1288 1289 1290 1291 1292
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1293
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1294
  taosMemoryFreeClear(pReq->name);
H
Hongze Cheng 已提交
1295
  taosMemoryFreeClear(pReq->ctb.pTag);
X
Xiaoyu Wang 已提交
1296 1297
}

X
Xiaoyu Wang 已提交
1298
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1299
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1300
  destroyBoundColumnInfo(&pCxt->tags);
X
Xiaoyu Wang 已提交
1301
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1302 1303
}

1304 1305
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }

X
Xiaoyu Wang 已提交
1306 1307 1308
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1309
  taosHashCleanup(pCxt->pSubTableHashObj);
D
dapan1121 已提交
1310
  taosHashCleanup(pCxt->pTableNameHashObj);
D
dapan1121 已提交
1311
  taosHashCleanup(pCxt->pDbFNameHashObj);
1312 1313

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1314 1315 1316
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

X
Xiaoyu Wang 已提交
1317
static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) {
1318 1319 1320 1321 1322 1323
//  SDbCfgInfo pInfo = {0};
//  char       fullName[TSDB_TABLE_FNAME_LEN];
//  snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName);
//  CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo));
//  return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1324 1325
}

1326 1327 1328 1329 1330 1331
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
1332 1333 1334
  int32_t tbNum = 0;
  char    tbFName[TSDB_TABLE_FNAME_LEN];
  bool    autoCreateTbl = false;
X
Xiaoyu Wang 已提交
1335

X
Xiaoyu Wang 已提交
1336
  // for each table
1337 1338
  while (1) {
    SToken sToken;
X
Xiaoyu Wang 已提交
1339
    char*  tbName = NULL;
D
stmt  
dapan1121 已提交
1340

1341 1342 1343 1344 1345
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
dapan1121 已提交
1346 1347 1348
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }
X
Xiaoyu Wang 已提交
1349

D
stmt  
dapan1121 已提交
1350
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1351
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1352 1353 1354 1355
      }
      break;
    }

D
stmt  
dapan1121 已提交
1356
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
X
Xiaoyu Wang 已提交
1357
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
D
stmt  
dapan1121 已提交
1358 1359
    }

D
stmt  
dapan1121 已提交
1360 1361
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1362 1363 1364
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
X
Xiaoyu Wang 已提交
1365

D
stmt  
dapan1121 已提交
1366 1367 1368 1369 1370 1371
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
X
Xiaoyu Wang 已提交
1372

1373 1374 1375
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

D
dapan 已提交
1376
    SName name;
1377
    CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
D
dapan 已提交
1378

X
Xiaoyu Wang 已提交
1379
    CHECK_CODE(checkSchemalessDb(pCxt, name.dbname));
1380

1381
    tNameExtractFullName(&name, tbFName);
X
Xiaoyu Wang 已提交
1382
    CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
D
dapan1121 已提交
1383 1384 1385
    char dbFName[TSDB_DB_FNAME_LEN];
    tNameGetFullDbName(&name, dbFName);
    CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
X
Xiaoyu Wang 已提交
1386

1387
    // USING clause
1388
    if (TK_USING == sToken.type) {
D
dapan 已提交
1389
      CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
1390
      NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1391
      autoCreateTbl = true;
1392
    } else {
D
dapan1121 已提交
1393
      CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
1394 1395
    }

H
refact  
Hongze Cheng 已提交
1396
    STableDataBlocks* dataBuf = NULL;
D
dapan 已提交
1397
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1398 1399
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));
1400

1401
    if (TK_NK_LP == sToken.type) {
1402
      // pSql -> field1_name, ...)
D
dapan1121 已提交
1403
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
1404 1405 1406 1407 1408 1409
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1410
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1411 1412

      tbNum++;
1413 1414 1415 1416
      continue;
    }

    // FILE csv_file_path
X
Xiaoyu Wang 已提交
1417
    if (TK_FILE == sToken.type) {
1418 1419
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1420
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1421 1422 1423
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1424
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1425 1426

      tbNum++;
1427 1428 1429 1430 1431
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1432

D
stmt  
dapan1121 已提交
1433
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
X
Xiaoyu Wang 已提交
1434
    SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1435 1436 1437 1438
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
1439 1440
    (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
                                pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1441

D
dapan 已提交
1442
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1443 1444
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
D
dapan1121 已提交
1445
    pCxt->pTableMeta = NULL;
X
Xiaoyu Wang 已提交
1446

D
stmt  
dapan1121 已提交
1447 1448
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1449

1450
  // merge according to vgId
D
stmt  
dapan1121 已提交
1451
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1452
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1453
  }
1454
  return buildOutput(pCxt);
1455 1456 1457 1458 1459 1460 1461 1462
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
1463
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
1464
  SInsertParseContext context = {
X
Xiaoyu Wang 已提交
1465 1466 1467 1468
      .pComCxt = pContext,
      .pSql = (char*)pContext->pSql,
      .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
      .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1469 1470
      .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
      .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
D
dapan1121 已提交
1471
      .pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
X
Xiaoyu Wang 已提交
1472 1473
      .totalNum = 0,
      .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
1474 1475
      .pStmtCb = pContext->pStmtCb,
      .pMetaCache = pMetaCache};
1476

D
stmt  
dapan1121 已提交
1477
  if (pContext->pStmtCb && *pQuery) {
X
Xiaoyu Wang 已提交
1478 1479
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
                                        &context.pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1480
  } else {
X
Xiaoyu Wang 已提交
1481 1482 1483
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
    context.pTableBlockHashObj =
        taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
D
stmt  
dapan1121 已提交
1484
  }
X
Xiaoyu Wang 已提交
1485 1486

  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
D
dapan1121 已提交
1487
      NULL == context.pTableNameHashObj || NULL == context.pDbFNameHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1488
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1489
  }
1490
  taosHashSetFreeFp(context.pSubTableHashObj, destroySubTableHashElem);
1491

D
stmt  
dapan1121 已提交
1492 1493 1494 1495
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1496
  if (NULL == *pQuery) {
1497
    *pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
D
stmt  
dapan1121 已提交
1498 1499 1500
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1501
  }
1502 1503 1504 1505
  (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
  (*pQuery)->haveResultSet = false;
  (*pQuery)->msgType = TDMT_VND_SUBMIT;
  (*pQuery)->pRoot = (SNode*)context.pOutput;
X
Xiaoyu Wang 已提交
1506

D
dapan1121 已提交
1507 1508 1509 1510 1511 1512
  if (NULL == (*pQuery)->pTableList) {
    (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
    if (NULL == (*pQuery)->pTableList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
1513

D
dapan1121 已提交
1514 1515 1516 1517 1518 1519 1520
  if (NULL == (*pQuery)->pDbList) {
    (*pQuery)->pDbList = taosArrayInit(taosHashGetSize(context.pDbFNameHashObj), TSDB_DB_FNAME_LEN);
    if (NULL == (*pQuery)->pDbList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

1521
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1522

1523
  int32_t code = skipInsertInto(&context.pSql, &context.msg);
1524 1525 1526
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
1527
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
X
Xiaoyu Wang 已提交
1528 1529 1530 1531 1532
    SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
    while (NULL != pTable) {
      taosArrayPush((*pQuery)->pTableList, pTable);
      pTable = taosHashIterate(context.pTableNameHashObj, pTable);
    }
D
dapan1121 已提交
1533 1534 1535 1536 1537 1538

    char* pDb = taosHashIterate(context.pDbFNameHashObj, NULL);
    while (NULL != pDb) {
      taosArrayPush((*pQuery)->pDbList, pDb);
      pDb = taosHashIterate(context.pDbFNameHashObj, pDb);
    }
X
Xiaoyu Wang 已提交
1539
  }
1540
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1541
  return code;
1542
}
D
stmt  
dapan1121 已提交
1543

1544 1545 1546 1547 1548 1549 1550 1551
typedef struct SInsertParseSyntaxCxt {
  SParseContext*   pComCxt;
  char*            pSql;
  SMsgBuf          msg;
  SParseMetaCache* pMetaCache;
} SInsertParseSyntaxCxt;

static int32_t skipParentheses(SInsertParseSyntaxCxt* pCxt) {
X
Xiaoyu Wang 已提交
1552 1553
  SToken  sToken;
  int32_t expectRightParenthesis = 1;
1554 1555
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);
X
Xiaoyu Wang 已提交
1556 1557 1558
    if (TK_NK_LP == sToken.type) {
      ++expectRightParenthesis;
    } else if (TK_NK_RP == sToken.type && 0 == --expectRightParenthesis) {
1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617
      break;
    }
    if (0 == sToken.n) {
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t skipBoundColumns(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t skipValuesClause(SInsertParseSyntaxCxt* pCxt) {
  int32_t numOfRows = 0;
  SToken  sToken;
  while (1) {
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
    if (TK_NK_LP != sToken.type) {
      break;
    }
    pCxt->pSql += index;

    CHECK_CODE(skipParentheses(pCxt));
    ++numOfRows;
  }
  if (0 == numOfRows) {
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t skipTagsClause(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }

// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_LP == sToken.type) {
    CHECK_CODE(skipBoundColumns(pCxt));
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_LP != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
  CHECK_CODE(skipTagsClause(pCxt));

  return TSDB_CODE_SUCCESS;
}

static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
  SName name;
  CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
X
Xiaoyu Wang 已提交
1618
  CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache));
1619 1620 1621 1622 1623 1624
  CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
  CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
  CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1625 1626 1627 1628 1629 1630 1631
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
  SName name;
  CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
  CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
  return TSDB_CODE_SUCCESS;
}

1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
  bool hasData = false;
  // for each table
  while (1) {
    SToken sToken;

    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }

      if (!hasData) {
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
      }
      break;
    }

    hasData = false;

    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

    // USING clause
    if (TK_USING == sToken.type) {
X
Xiaoyu Wang 已提交
1660
      CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
      NEXT_TOKEN(pCxt->pSql, sToken);
      CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
      CHECK_CODE(skipUsingClause(pCxt));
      NEXT_TOKEN(pCxt->pSql, sToken);
    } else {
      CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
    }

    if (TK_NK_LP == sToken.type) {
      // pSql -> field1_name, ...)
      CHECK_CODE(skipBoundColumns(pCxt));
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(skipValuesClause(pCxt));
      hasData = true;
      continue;
    }

    // FILE csv_file_path
    if (TK_FILE == sToken.type) {
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      hasData = true;
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }

  return TSDB_CODE_SUCCESS;
}

1699
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
1700 1701 1702
  SInsertParseSyntaxCxt context = {.pComCxt = pContext,
                                   .pSql = (char*)pContext->pSql,
                                   .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
1703 1704
                                   .pMetaCache = pMetaCache};
  int32_t               code = skipInsertInto(&context.pSql, &context.msg);
1705 1706 1707 1708
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBodySyntax(&context);
  }
  if (TSDB_CODE_SUCCESS == code) {
1709
    *pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
1710 1711 1712 1713 1714 1715 1716
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  return code;
}

X
Xiaoyu Wang 已提交
1717 1718 1719 1720
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
D
stmt  
dapan1121 已提交
1721
  int32_t code = 0;
X
Xiaoyu Wang 已提交
1722 1723
  char*   tbName = NULL;

D
stmt  
dapan1121 已提交
1724
  NEXT_TOKEN(pTableName, sToken);
X
Xiaoyu Wang 已提交
1725

D
stmt  
dapan1121 已提交
1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1737
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1738 1739 1740 1741 1742 1743 1744
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
1745 1746
  SVnodeModifOpStmt*  modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
  int32_t             code = 0;
D
stmt  
dapan1121 已提交
1747
  SInsertParseContext insertCtx = {
X
Xiaoyu Wang 已提交
1748 1749 1750
      .pVgroupsHashObj = pVgHash,
      .pTableBlockHashObj = pBlockHash,
      .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1751
  };
X
Xiaoyu Wang 已提交
1752

D
stmt  
dapan1121 已提交
1753 1754
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1755
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1756 1757 1758 1759
  }

  CHECK_CODE(buildOutput(&insertCtx));

wmmhello's avatar
wmmhello 已提交
1760
  destroyBlockArrayList(insertCtx.pVgDataBlocks);
D
stmt  
dapan1121 已提交
1761 1762 1763
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1764 1765 1766 1767
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
                           char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
stmt  
dapan1121 已提交
1768 1769 1770 1771 1772
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

C
Cary Xu 已提交
1773 1774
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
C
Cary Xu 已提交
1775
    return buildInvalidOperationMsg(&pBuf, "out of memory");
D
stmt  
dapan1121 已提交
1776 1777
  }

1778
  int32_t  code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1779
  SSchema* pSchema = pDataBlock->pTableMeta->schema;
D
stmt  
dapan1121 已提交
1780

1781
  bool  isJson = false;
wmmhello's avatar
wmmhello 已提交
1782
  STag* pTag = NULL;
D
dapan1121 已提交
1783

D
stmt  
dapan1121 已提交
1784 1785 1786 1787
  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      continue;
    }
X
Xiaoyu Wang 已提交
1788

X
Xiaoyu Wang 已提交
1789
    SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
1790
    int32_t  colLen = pTagSchema->bytes;
D
stmt  
dapan1121 已提交
1791 1792 1793
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
wmmhello's avatar
wmmhello 已提交
1794 1795 1796 1797 1798
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
      if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
        goto end;
      }
X
Xiaoyu Wang 已提交
1799

wmmhello's avatar
wmmhello 已提交
1800
      isJson = true;
1801
      char* tmp = taosMemoryCalloc(1, colLen + 1);
wmmhello's avatar
wmmhello 已提交
1802
      memcpy(tmp, bind[c].buffer, colLen);
wmmhello's avatar
wmmhello 已提交
1803
      code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
wmmhello's avatar
wmmhello 已提交
1804
      taosMemoryFree(tmp);
1805
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1806 1807
        goto end;
      }
1808
    } else {
wmmhello's avatar
wmmhello 已提交
1809
      STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
1810
      if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1811 1812
        val.pData = (uint8_t*)bind[c].buffer;
        val.nData = colLen;
1813
      } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
1814
        int32_t output = 0;
1815 1816
        void*   p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
        if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
1817 1818 1819
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
1820
        if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
wmmhello's avatar
wmmhello 已提交
1821 1822
          if (errno == E2BIG) {
            taosMemoryFree(p);
wmmhello's avatar
wmmhello 已提交
1823
            code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
wmmhello's avatar
wmmhello 已提交
1824 1825 1826 1827
            goto end;
          }
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
wmmhello's avatar
wmmhello 已提交
1828
          taosMemoryFree(p);
wmmhello's avatar
wmmhello 已提交
1829
          code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
wmmhello's avatar
wmmhello 已提交
1830 1831
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
1832 1833
        val.pData = p;
        val.nData = output;
1834
      } else {
wmmhello's avatar
wmmhello 已提交
1835
        memcpy(&val.i64, bind[c].buffer, colLen);
wmmhello's avatar
wmmhello 已提交
1836
      }
wmmhello's avatar
wmmhello 已提交
1837
      taosArrayPush(pTagArray, &val);
wmmhello's avatar
wmmhello 已提交
1838
    }
D
stmt  
dapan1121 已提交
1839 1840
  }

wmmhello's avatar
wmmhello 已提交
1841
  if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1842
    goto end;
D
stmt  
dapan1121 已提交
1843 1844 1845
  }

  SVCreateTbReq tbReq = {0};
wmmhello's avatar
wmmhello 已提交
1846 1847
  buildCreateTbReq(&tbReq, tName, pTag, suid);
  code = buildCreateTbMsg(pDataBlock, &tbReq);
D
stmt  
dapan1121 已提交
1848 1849
  destroyCreateSubTbReq(&tbReq);

wmmhello's avatar
wmmhello 已提交
1850 1851
end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
1852 1853
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
1854 1855 1856
      taosMemoryFree(p->pData);
    }
  }
C
Cary Xu 已提交
1857
  taosArrayDestroy(pTagArray);
D
stmt  
dapan1121 已提交
1858

wmmhello's avatar
wmmhello 已提交
1859
  return code;
D
stmt  
dapan1121 已提交
1860 1861
}

X
Xiaoyu Wang 已提交
1862 1863 1864 1865
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1866 1867
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1868 1869 1870 1871
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
stmt  
dapan1121 已提交
1872 1873
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1874
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
X
Xiaoyu Wang 已提交
1875

D
stmt  
dapan1121 已提交
1876 1877 1878
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
X
Xiaoyu Wang 已提交
1879

D
stmt  
dapan1121 已提交
1880
    for (int c = 0; c < spd->numOfBound; ++c) {
X
Xiaoyu Wang 已提交
1881
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
D
stmt  
dapan1121 已提交
1882 1883 1884 1885

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
X
Xiaoyu Wang 已提交
1886

D
stmt  
dapan1121 已提交
1887 1888 1889 1890
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1891 1892 1893
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
X
Xiaoyu Wang 已提交
1894

D
stmt  
dapan1121 已提交
1895 1896
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
D
dapan1121 已提交
1897 1898 1899 1900
        if (bind[c].buffer_type != pColSchema->type) {
          return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
        }

D
stmt  
dapan1121 已提交
1901 1902 1903 1904
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
X
Xiaoyu Wang 已提交
1905 1906

        CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1907
      }
X
Xiaoyu Wang 已提交
1908

D
stmt  
dapan1121 已提交
1909 1910
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1911
        checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922
      }
    }
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
C
Cary Xu 已提交
1923 1924
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1925
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1926 1927
    taosMemoryFree(pSTSchema);
#endif
D
stmt  
dapan1121 已提交
1928 1929
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
1930

X
Xiaoyu Wang 已提交
1931
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1932 1933 1934 1935 1936 1937 1938
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1939 1940 1941 1942 1943
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1944 1945
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1946 1947 1948 1949
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  bool                rowStart = (0 == colIdx);
  bool                rowEnd = ((colIdx + 1) == spd->numOfBound);
D
stmt  
dapan1121 已提交
1950 1951 1952 1953 1954

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
X
Xiaoyu Wang 已提交
1955

D
stmt  
dapan1121 已提交
1956 1957 1958 1959 1960 1961 1962
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
D
stmt  
dapan1121 已提交
1963

X
Xiaoyu Wang 已提交
1964
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]];
D
stmt  
dapan1121 已提交
1965 1966 1967 1968

    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
X
Xiaoyu Wang 已提交
1969

D
stmt  
dapan1121 已提交
1970 1971 1972 1973 1974 1975 1976
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
X
Xiaoyu Wang 已提交
1977

D
stmt  
dapan1121 已提交
1978 1979
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
1980 1981 1982 1983
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
1984 1985 1986 1987
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
X
Xiaoyu Wang 已提交
1988 1989

      CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1990
    }
X
Xiaoyu Wang 已提交
1991

D
stmt  
dapan1121 已提交
1992 1993
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1994
      checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1995
    }
X
Xiaoyu Wang 已提交
1996

D
stmt  
dapan1121 已提交
1997 1998 1999 2000 2001 2002 2003 2004
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
X
Xiaoyu Wang 已提交
2005
    }
C
Cary Xu 已提交
2006 2007

#ifdef TD_DEBUG_PRINT_ROW
X
Xiaoyu Wang 已提交
2008
    if (rowEnd) {
C
Cary Xu 已提交
2009
      STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
2010
      tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
2011 2012 2013
      taosMemoryFree(pSTSchema);
    }
#endif
D
stmt  
dapan1121 已提交
2014 2015
  }

D
stmt  
dapan1121 已提交
2016 2017 2018
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

X
Xiaoyu Wang 已提交
2019
    SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
2020 2021 2022 2023 2024 2025 2026 2027
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}

2028 2029
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
                         uint8_t timePrec) {
D
stmt  
dapan1121 已提交
2030 2031 2032 2033 2034 2035
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

D
dapan1121 已提交
2036 2037
    SSchema* schema = &pSchema[boundInfo->boundColumns[0]];
    if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
D
dapan1121 已提交
2038 2039
      (*fields)[0].precision = timePrec;
    }
2040

D
stmt  
dapan1121 已提交
2041
    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
D
dapan1121 已提交
2042 2043 2044 2045
      schema = &pSchema[boundInfo->boundColumns[i]];
      strcpy((*fields)[i].name, schema->name);
      (*fields)[i].type = schema->type;
      (*fields)[i].bytes = schema->bytes;
D
stmt  
dapan1121 已提交
2046
    }
D
stmt  
dapan1121 已提交
2047 2048 2049 2050 2051 2052 2053
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2054
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
X
Xiaoyu Wang 已提交
2055
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
2056 2057 2058 2059
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
X
Xiaoyu Wang 已提交
2060

D
dapan1121 已提交
2061 2062 2063 2064
  if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) {
    return TSDB_CODE_TSC_STMT_API_ERROR;
  }

X
Xiaoyu Wang 已提交
2065
  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
2066 2067 2068 2069 2070 2071 2072
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
2073
  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0));
X
Xiaoyu Wang 已提交
2074

D
stmt  
dapan1121 已提交
2075 2076 2077
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2078
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
X
Xiaoyu Wang 已提交
2079 2080
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*          pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
2081 2082
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
2083 2084 2085
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
2086 2087 2088 2089

    return TSDB_CODE_SUCCESS;
  }

2090 2091
  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields,
                              pDataBlock->pTableMeta->tableInfo.precision));
X
Xiaoyu Wang 已提交
2092

D
stmt  
dapan1121 已提交
2093 2094 2095
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2096
// schemaless logic start
D
stmt  
dapan1121 已提交
2097

wmmhello's avatar
wmmhello 已提交
2098
typedef struct SmlExecTableHandle {
X
Xiaoyu Wang 已提交
2099 2100
  SParsedDataColInfo tags;          // each table
  SVCreateTbReq      createTblReq;  // each table
wmmhello's avatar
wmmhello 已提交
2101
} SmlExecTableHandle;
wmmhello's avatar
wmmhello 已提交
2102

wmmhello's avatar
wmmhello 已提交
2103
typedef struct SmlExecHandle {
2104 2105 2106
  SHashObj*          pBlockHash;
  SmlExecTableHandle tableExecHandle;
  SQuery*            pQuery;
wmmhello's avatar
wmmhello 已提交
2107
} SSmlExecHandle;
wmmhello's avatar
wmmhello 已提交
2108

wmmhello's avatar
wmmhello 已提交
2109 2110 2111 2112 2113 2114
static void smlDestroyTableHandle(void* pHandle) {
  SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
  destroyBoundColumnInfo(&handle->tags);
  destroyCreateSubTbReq(&handle->createTblReq);
}

X
Xiaoyu Wang 已提交
2115
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
wmmhello's avatar
wmmhello 已提交
2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127
  col_id_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0;
  pColList->boundNullLen = 0;
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
  for (col_id_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  bool     isOrdered = true;
  col_id_t lastColIdx = -1;  // last column found
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
X
Xiaoyu Wang 已提交
2128 2129
    SSmlKv*  kv = taosArrayGetP(cols, i);
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
wmmhello's avatar
wmmhello 已提交
2130 2131 2132 2133 2134 2135 2136
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
2137
      uError("smlBoundColumnData. index:%d", index);
wmmhello's avatar
wmmhello 已提交
2138 2139 2140
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
2141
      uError("smlBoundColumnData. already set. index:%d", index);
wmmhello's avatar
wmmhello 已提交
2142 2143 2144 2145
      return TSDB_CODE_SML_INVALID_DATA;
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
2146
    pColList->boundColumns[pColList->numOfBound] = index;
wmmhello's avatar
wmmhello 已提交
2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179
    ++pColList->numOfBound;
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
2180
  if (pColList->numOfCols > pColList->numOfBound) {
wmmhello's avatar
wmmhello 已提交
2181 2182 2183 2184 2185 2186 2187
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198
/**
 * @brief No json tag for schemaless
 *
 * @param cols
 * @param tags
 * @param pSchema
 * @param ppTag
 * @param msg
 * @return int32_t
 */
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SMsgBuf* msg) {
C
Cary Xu 已提交
2199 2200
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
wmmhello's avatar
wmmhello 已提交
2201 2202 2203
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

wmmhello's avatar
wmmhello 已提交
2204
  int32_t code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2205
  for (int i = 0; i < tags->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
2206
    SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
2207
    SSmlKv*  kv = taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
2208

wmmhello's avatar
wmmhello 已提交
2209
    STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
2210 2211
    if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
      val.pData = (uint8_t*)kv->value;
wmmhello's avatar
wmmhello 已提交
2212
      val.nData = kv->length;
2213
    } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
2214
      int32_t output = 0;
X
Xiaoyu Wang 已提交
2215 2216
      void*   p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
2217 2218 2219
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
2220
      if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) {
wmmhello's avatar
wmmhello 已提交
2221 2222
        if (errno == E2BIG) {
          taosMemoryFree(p);
wmmhello's avatar
wmmhello 已提交
2223
          code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
wmmhello's avatar
wmmhello 已提交
2224 2225 2226 2227 2228 2229 2230 2231 2232 2233
          goto end;
        }
        char buf[512] = {0};
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
        taosMemoryFree(p);
        code = buildSyntaxErrMsg(msg, buf, kv->value);
        goto end;
      }
      val.pData = p;
      val.nData = output;
2234
    } else {
wmmhello's avatar
wmmhello 已提交
2235
      memcpy(&val.i64, &(kv->value), kv->length);
wmmhello's avatar
wmmhello 已提交
2236
    }
wmmhello's avatar
wmmhello 已提交
2237
    taosArrayPush(pTagArray, &val);
wmmhello's avatar
wmmhello 已提交
2238 2239
  }

wmmhello's avatar
wmmhello 已提交
2240 2241 2242
  code = tTagNew(pTagArray, 1, false, ppTag);
end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
2243 2244
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
2245 2246
      taosMemoryFree(p->pData);
    }
wmmhello's avatar
wmmhello 已提交
2247
  }
C
Cary Xu 已提交
2248
  taosArrayDestroy(pTagArray);
wmmhello's avatar
wmmhello 已提交
2249
  return code;
wmmhello's avatar
wmmhello 已提交
2250 2251
}

2252 2253
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
                    char* tableName, char* msgBuf, int16_t msgBufLen) {
wmmhello's avatar
wmmhello 已提交
2254 2255
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

X
Xiaoyu Wang 已提交
2256
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
2257 2258
  smlDestroyTableHandle(&smlHandle->tableExecHandle);  // free for each table
  SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
wmmhello's avatar
wmmhello 已提交
2259 2260
  setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
  int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
X
Xiaoyu Wang 已提交
2261
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2262 2263 2264
    buildInvalidOperationMsg(&pBuf, "bound tags error");
    return ret;
  }
C
Cary Xu 已提交
2265 2266
  STag* pTag = NULL;
  ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &pBuf);
X
Xiaoyu Wang 已提交
2267
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2268 2269
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2270

C
Cary Xu 已提交
2271
  buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid);
wmmhello's avatar
wmmhello 已提交
2272

wmmhello's avatar
wmmhello 已提交
2273
  STableDataBlocks* pDataBlock = NULL;
X
Xiaoyu Wang 已提交
2274 2275
  ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
                             TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
wmmhello's avatar
wmmhello 已提交
2276
                             pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
X
Xiaoyu Wang 已提交
2277
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2278 2279 2280
    buildInvalidOperationMsg(&pBuf, "create data block error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2281 2282 2283

  SSchema* pSchema = getTableColumnSchema(pTableMeta);

2284
  ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
X
Xiaoyu Wang 已提交
2285
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2286 2287 2288
    buildInvalidOperationMsg(&pBuf, "bound cols error");
    return ret;
  }
X
Xiaoyu Wang 已提交
2289
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
wmmhello's avatar
wmmhello 已提交
2290 2291
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
2292
  SMemParam           param = {.rb = pBuilder};
wmmhello's avatar
wmmhello 已提交
2293 2294 2295

  initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);

2296
  int32_t rowNum = taosArrayGetSize(cols);
2297
  if (rowNum <= 0) {
wmmhello's avatar
wmmhello 已提交
2298 2299
    return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
  }
wmmhello's avatar
wmmhello 已提交
2300
  ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
X
Xiaoyu Wang 已提交
2301
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2302 2303 2304
    buildInvalidOperationMsg(&pBuf, "allocate memory error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2305 2306 2307
  for (int32_t r = 0; r < rowNum; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
2308
    void*  rowData = taosArrayGetP(cols, r);
2309
    size_t rowDataSize = 0;
2310
    if (format) {
2311
      rowDataSize = taosArrayGetSize(rowData);
wmmhello's avatar
wmmhello 已提交
2312
    }
wmmhello's avatar
wmmhello 已提交
2313 2314

    // 1. set the parsed value from sql string
2315
    for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
2316
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
wmmhello's avatar
wmmhello 已提交
2317 2318 2319 2320

      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

X
Xiaoyu Wang 已提交
2321 2322 2323
      SSmlKv* kv = NULL;
      if (format) {
        if (j < rowDataSize) {
2324
          kv = taosArrayGetP(rowData, j);
X
Xiaoyu Wang 已提交
2325 2326
          if (rowDataSize != spd->numOfBound &&
              (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
2327
            kv = NULL;
X
Xiaoyu Wang 已提交
2328
          } else {
2329
            j++;
2330
          }
wmmhello's avatar
wmmhello 已提交
2331
        }
X
Xiaoyu Wang 已提交
2332 2333 2334
      } else {
        void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
        if (p) kv = *p;
wmmhello's avatar
wmmhello 已提交
2335
      }
wmmhello's avatar
wmmhello 已提交
2336

2337
      if (!kv || kv->length == 0) {
wmmhello's avatar
wmmhello 已提交
2338 2339
        MemRowAppend(&pBuf, NULL, 0, &param);
      } else {
wmmhello's avatar
wmmhello 已提交
2340 2341
        int32_t colLen = kv->length;
        if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
2342
          kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
wmmhello's avatar
wmmhello 已提交
2343 2344
        }

2345
        if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
2346
          MemRowAppend(&pBuf, kv->value, colLen, &param);
2347
        } else {
wmmhello's avatar
wmmhello 已提交
2348 2349
          MemRowAppend(&pBuf, &(kv->value), colLen, &param);
        }
wmmhello's avatar
wmmhello 已提交
2350 2351 2352 2353
      }

      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
2354
        checkTimestamp(pDataBlock, (const char*)&tsKey);
wmmhello's avatar
wmmhello 已提交
2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370
      }
    }

    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }

    pDataBlock->size += extendedRowSize;
  }

X
Xiaoyu Wang 已提交
2371
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
wmmhello's avatar
wmmhello 已提交
2372 2373 2374 2375 2376 2377 2378
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2379 2380 2381
void* smlInitHandle(SQuery* pQuery) {
  SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
  if (!handle) return NULL;
wmmhello's avatar
wmmhello 已提交
2382 2383 2384 2385 2386 2387
  handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
  handle->pQuery = pQuery;

  return handle;
}

X
Xiaoyu Wang 已提交
2388 2389 2390
void smlDestroyHandle(void* pHandle) {
  if (!pHandle) return;
  SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
wmmhello's avatar
wmmhello 已提交
2391
  destroyBlockHashmap(handle->pBlockHash);
wmmhello's avatar
wmmhello 已提交
2392
  smlDestroyTableHandle(&handle->tableExecHandle);
wmmhello's avatar
wmmhello 已提交
2393 2394 2395 2396
  taosMemoryFree(handle);
}

int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
2397
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
wmmhello's avatar
wmmhello 已提交
2398 2399
  return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
wmmhello's avatar
wmmhello 已提交
2400
// schemaless logic end