parInsert.c 86.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43
  } while (0)

44 45 46 47 48 49 50
#define NEXT_VALID_TOKEN(pSql, sToken)        \
  do {                                        \
    sToken.n = tGetToken(pSql, &sToken.type); \
    sToken.z = pSql;                          \
    pSql += sToken.n;                         \
  } while (TK_NK_SPACE == sToken.type)

51 52 53 54 55 56
typedef struct SInsertParseBaseContext {
  SParseContext* pComCxt;
  char*          pSql;
  SMsgBuf        msg;
} SInsertParseBaseContext;

57
typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
58 59 60 61 62 63 64 65 66 67
  SParseContext*     pComCxt;             // input
  char*              pSql;                // input
  SMsgBuf            msg;                 // input
  STableMeta*        pTableMeta;          // each table
  SParsedDataColInfo tags;                // each table
  SVCreateTbReq      createTblReq;        // each table
  SHashObj*          pVgroupsHashObj;     // global
  SHashObj*          pTableBlockHashObj;  // global
  SHashObj*          pSubTableHashObj;    // global
  SArray*            pVgDataBlocks;       // global
X
Xiaoyu Wang 已提交
68
  SHashObj*          pTableNameHashObj;   // global
D
dapan1121 已提交
69
  SHashObj*          pDbFNameHashObj;     // global
X
Xiaoyu Wang 已提交
70
  int32_t            totalNum;
X
Xiaoyu Wang 已提交
71
  SVnodeModifOpStmt* pOutput;
X
Xiaoyu Wang 已提交
72
  SStmtCallback*     pStmtCb;
73
  SParseMetaCache*   pMetaCache;
74 75
} SInsertParseContext;

76 77 78 79 80 81 82
typedef struct SInsertParseSyntaxCxt {
  SParseContext*   pComCxt;
  char*            pSql;
  SMsgBuf          msg;
  SParseMetaCache* pMetaCache;
} SInsertParseSyntaxCxt;

H
refact  
Hongze Cheng 已提交
83
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
84 85 86 87

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
88
typedef struct SKvParam {
C
Cary Xu 已提交
89
  int16_t  pos;
C
Cary Xu 已提交
90
  SArray*  pTagVals;
C
Cary Xu 已提交
91 92
  SSchema* schema;
  char     buf[TSDB_MAX_TAGS_LEN];
D
stmt  
dapan1121 已提交
93 94 95 96 97 98 99 100 101
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

X
Xiaoyu Wang 已提交
102 103 104
#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
D
stmt  
dapan1121 已提交
105
    if (TSDB_CODE_SUCCESS != code) { \
X
Xiaoyu Wang 已提交
106 107
      return code;                   \
    }                                \
D
stmt  
dapan1121 已提交
108 109
  } while (0)

110
static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
111
  SToken sToken;
112
  NEXT_TOKEN(*pSql, sToken);
113
  if (TK_INSERT != sToken.type && TK_IMPORT != sToken.type) {
114
    return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
115
  }
116
  NEXT_TOKEN(*pSql, sToken);
117
  if (TK_INTO != sToken.type) {
118
    return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", sToken.z);
119 120 121 122
  }
  return TSDB_CODE_SUCCESS;
}

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

185
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
186
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
187
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
188 189
  }

190
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
191
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
192
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
193
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
194 195
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
196
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
197 198
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
199

200 201 202
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
203
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
204 205 206
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";
207
  const char* msg4 = "invalid table name";
X
Xiaoyu Wang 已提交
208

H
refact  
Hongze Cheng 已提交
209 210
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
211

H
refact  
Hongze Cheng 已提交
212
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
213 214 215
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
216
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
217 218 219
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
220
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
221 222 223 224 225
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
226 227 228 229
    if (tbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg4);
    }

230
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
231
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
232
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
249
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
250 251 252
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
253
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

268 269
static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
X
Xiaoyu Wang 已提交
270
  if (pBasicCtx->async) {
271 272
    return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
  }
X
Xiaoyu Wang 已提交
273
  SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
D
dapan1121 已提交
274 275 276 277 278
                           .requestId = pBasicCtx->requestId,
                           .requestObjRefId = pBasicCtx->requestRid,
                           .mgmtEps = pBasicCtx->mgmtEpSet};

  return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
279 280 281 282
}

static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
X
Xiaoyu Wang 已提交
283
  if (pBasicCtx->async) {
284 285
    return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
  }
X
Xiaoyu Wang 已提交
286
  SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
D
dapan1121 已提交
287 288 289
                           .requestId = pBasicCtx->requestId,
                           .requestObjRefId = pBasicCtx->requestRid,
                           .mgmtEps = pBasicCtx->mgmtEpSet};
X
Xiaoyu Wang 已提交
290

291
  if (isStb) {
D
dapan1121 已提交
292
    return catalogGetSTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
293
  }
D
dapan1121 已提交
294
  return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
295 296 297
}

static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
H
Haojun Liao 已提交
298
  SParseContext* pBasicCtx = pCxt->pComCxt;
X
Xiaoyu Wang 已提交
299
  if (pBasicCtx->async) {
300 301
    return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
  }
X
Xiaoyu Wang 已提交
302
  SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
D
dapan1121 已提交
303 304 305 306
                           .requestId = pBasicCtx->requestId,
                           .requestObjRefId = pBasicCtx->requestRid,
                           .mgmtEps = pBasicCtx->mgmtEpSet};
  return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg);
307
}
D
dapan 已提交
308

309
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
D
dapan 已提交
310
  bool pass = false;
311
  CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
D
dapan 已提交
312 313 314
  if (!pass) {
    return TSDB_CODE_PAR_PERMISSION_DENIED;
  }
315 316 317

  CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
  if (!isStb) {
X
Xiaoyu Wang 已提交
318
    SVgroupInfo vg;
319
    CHECK_CODE(getTableVgroup(pCxt, name, &vg));
X
Xiaoyu Wang 已提交
320
    CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
stmt  
dapan1121 已提交
321
  }
H
refact  
Hongze Cheng 已提交
322
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
323 324
}

X
Xiaoyu Wang 已提交
325 326 327
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, false);
}
D
stmt  
dapan1121 已提交
328

X
Xiaoyu Wang 已提交
329 330 331
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, true);
}
D
stmt  
dapan1121 已提交
332

X
Xiaoyu Wang 已提交
333 334 335 336 337
static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
  if (pBasicCtx->async) {
    CHECK_CODE(getDbCfgFromCache(pCxt->pMetaCache, pDbFName, pInfo));
  } else {
X
Xiaoyu Wang 已提交
338
    SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
D
dapan1121 已提交
339 340 341 342
                             .requestId = pBasicCtx->requestId,
                             .requestObjRefId = pBasicCtx->requestRid,
                             .mgmtEps = pBasicCtx->mgmtEpSet};
    CHECK_CODE(catalogGetDBCfg(pBasicCtx->pCatalog, &conn, pDbFName, pInfo));
X
Xiaoyu Wang 已提交
343 344 345 346
  }
  return TSDB_CODE_SUCCESS;
}

347 348 349 350 351 352 353 354 355 356
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
357
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
358 359 360 361 362 363 364 365 366
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
367
    int32_t schemaLen = blk->schemaLen;
H
refact  
Hongze Cheng 已提交
368 369 370 371 372 373 374
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
375
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
H
refact  
Hongze Cheng 已提交
376
  }
377 378 379 380 381 382 383 384 385 386
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
387
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
388 389 390
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
391
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
392 393
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
wafwerar's avatar
wafwerar 已提交
394
    TSWAP(dst->pData, src->pData);
D
dapan1121 已提交
395
    buildMsgHeader(src, dst);
396 397 398 399 400
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
401
int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
402 403 404 405 406
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
407
  TSKEY k = *(TSKEY*)start;
408
  if (k <= pDataBlocks->prevTS) {
409 410 411 412 413 414 415
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
416 417 418 419 420 421
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
422 423

  if (pToken->type == TK_NOW) {
424
    ts = taosGetTimestamp(timePrec);
425 426
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
427
  } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
428
    toInteger(pToken->z, pToken->n, 10, &ts);
H
refact  
Hongze Cheng 已提交
429
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
430
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
431
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
432 433 434 435 436 437 438
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
439
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
440 441 442 443
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
444
    if (pToken->z[k] == ',') {
445 446
      *end = pTokenEnd;
      *time = ts;
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
462
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
463 464 465 466 467
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
468
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
469 470 471 472 473 474 475
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

476
    if (sToken.type == TK_NK_PLUS) {
477
      ts += interval;
478
    } else {
479
      ts = ts - interval;
480 481
    }

482
    *end = pTokenEnd;
483 484
  }

485
  *time = ts;
486 487
  return TSDB_CODE_SUCCESS;
}
488

wmmhello's avatar
wmmhello 已提交
489
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
490 491 492 493
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
494
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
495 496 497 498
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
499
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
500 501 502 503
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

504
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
505
    pToken->z = tmpTokenBuf;
506
    pToken->n = len;
X
Xiaoyu Wang 已提交
507 508 509 510 511
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
512
static bool isNullStr(SToken* pToken) {
513
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
514 515 516
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
517
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
518
  errno = 0;
wafwerar's avatar
wafwerar 已提交
519
  *value = taosStr2Double(pToken->z, endPtr);
X
Xiaoyu Wang 已提交
520 521 522

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
523
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
524 525 526 527 528
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
529 530
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
531 532 533
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;
X
Xiaoyu Wang 已提交
534

wmmhello's avatar
wmmhello 已提交
535
  int32_t code = checkAndTrimValue(pToken, tmpTokenBuf, pMsgBuf);
X
Xiaoyu Wang 已提交
536 537 538 539 540 541
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
542
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
543 544
    }

X
Xiaoyu Wang 已提交
545
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
546 547 548 549
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
550
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
551
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
552
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
553
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
554
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
555 556 557
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
558
      } else if (pToken->type == TK_NK_INTEGER) {
559 560
        return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
561
      } else if (pToken->type == TK_NK_FLOAT) {
562 563
        return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
X
Xiaoyu Wang 已提交
564 565 566 567 568 569
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
X
Xiaoyu Wang 已提交
570
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
571 572 573 574 575 576
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
577
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
578 579
    }

H
refact  
Hongze Cheng 已提交
580
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
581
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
582
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
X
Xiaoyu Wang 已提交
583
      } else if (!IS_VALID_UTINYINT(uv)) {
X
Xiaoyu Wang 已提交
584 585
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
586
      uint8_t tmpVal = (uint8_t)uv;
X
Xiaoyu Wang 已提交
587
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
588 589 590
    }

    case TSDB_DATA_TYPE_SMALLINT: {
X
Xiaoyu Wang 已提交
591
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
592 593 594 595 596
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
597
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
598 599 600
    }

    case TSDB_DATA_TYPE_USMALLINT: {
X
Xiaoyu Wang 已提交
601
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
602
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
X
Xiaoyu Wang 已提交
603
      } else if (!IS_VALID_USMALLINT(uv)) {
X
Xiaoyu Wang 已提交
604 605
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
606
      uint16_t tmpVal = (uint16_t)uv;
X
Xiaoyu Wang 已提交
607
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
608 609 610
    }

    case TSDB_DATA_TYPE_INT: {
X
Xiaoyu Wang 已提交
611
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
612 613 614 615 616
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
617
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
618 619 620
    }

    case TSDB_DATA_TYPE_UINT: {
X
Xiaoyu Wang 已提交
621
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
622
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
X
Xiaoyu Wang 已提交
623
      } else if (!IS_VALID_UINT(uv)) {
X
Xiaoyu Wang 已提交
624 625
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
626
      uint32_t tmpVal = (uint32_t)uv;
X
Xiaoyu Wang 已提交
627
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
628 629 630
    }

    case TSDB_DATA_TYPE_BIGINT: {
X
Xiaoyu Wang 已提交
631
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
632 633 634 635
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
636
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
637 638 639
    }

    case TSDB_DATA_TYPE_UBIGINT: {
X
Xiaoyu Wang 已提交
640
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
641
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
X
Xiaoyu Wang 已提交
642
      } else if (!IS_VALID_UBIGINT(uv)) {
X
Xiaoyu Wang 已提交
643 644
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
645
      return func(pMsgBuf, &uv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
646 647 648 649
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
650
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
651 652
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
653 654
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
655 656 657
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
658
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
659 660 661 662
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
663
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
664 665 666 667 668
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
669
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
670 671 672 673 674
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
D
dapan1121 已提交
675
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
X
Xiaoyu Wang 已提交
676 677
      }

X
Xiaoyu Wang 已提交
678
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
679 680 681
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
682
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
683
    }
684
    case TSDB_DATA_TYPE_JSON: {
X
Xiaoyu Wang 已提交
685
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
686 687 688 689
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }
X
Xiaoyu Wang 已提交
690 691 692 693 694 695
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
696
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
697 698 699 700 701 702
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
703
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
704 705
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
706 707 708 709 710 711

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

712
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
713
    const char* rowEnd = tdRowEnd(rb->pBuf);
714
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
715
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
716 717
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
718 719
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
720
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
D
dapan1121 已提交
721 722 723
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
X
Xiaoyu Wang 已提交
724 725 726
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
727
    }
728
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
729
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
730
  } else {
731
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
732
  }
733

734 735
  return TSDB_CODE_SUCCESS;
}
736 737 738

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
739
  col_id_t nCols = pColList->numOfCols;
740

H
refact  
Hongze Cheng 已提交
741
  pColList->numOfBound = 0;
C
Cary Xu 已提交
742
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
743
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
744
  for (col_id_t i = 0; i < nCols; ++i) {
745 746 747
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
748 749
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
750
  col_id_t lastColIdx = -1;  // last column found
751 752 753
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

754
    if (TK_NK_RP == sToken.type) {
755 756 757
      break;
    }

C
Cary Xu 已提交
758 759
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
760 761 762 763 764
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
765
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
766 767 768 769 770 771
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
772
    pColList->boundColumns[pColList->numOfBound] = index;
773
    ++pColList->numOfBound;
C
Cary Xu 已提交
774 775 776 777 778 779 780 781 782 783 784
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
785 786 787 788 789
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
790
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
791 792 793 794
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
795
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
796
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
797 798
      pColIdx[i].boundIdx = i;
    }
wafwerar's avatar
wafwerar 已提交
799
    taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
800
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
801 802
      pColIdx[i].finalIdx = i;
    }
wafwerar's avatar
wafwerar 已提交
803
    taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
804 805
  }

X
Xiaoyu Wang 已提交
806
  if (pColList->numOfCols > pColList->numOfBound) {
807 808 809
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
810 811 812 813

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
814 815 816 817 818
static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) {
  pTbReq->type = TD_CHILD_TABLE;
  pTbReq->name = strdup(tname);
  pTbReq->ctb.suid = suid;
  pTbReq->ctb.pTag = (uint8_t*)pTag;
wmmhello's avatar
wmmhello 已提交
819
  pTbReq->commentLen = -1;
820

wmmhello's avatar
wmmhello 已提交
821 822
  return;
}
823

824 825
static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, STagVal* val,
                             SMsgBuf* pMsgBuf) {
wmmhello's avatar
wmmhello 已提交
826 827 828 829 830 831 832 833
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
    }
834 835 836 837

    return TSDB_CODE_SUCCESS;
  }

wmmhello's avatar
wmmhello 已提交
838
  val->cid = pSchema->colId;
wmmhello's avatar
wmmhello 已提交
839
  val->type = pSchema->type;
C
Cary Xu 已提交
840

wmmhello's avatar
wmmhello 已提交
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
          *(int8_t*)(&val->i64) = TRUE_VALUE;
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
          *(int8_t*)(&val->i64) = FALSE_VALUE;
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
      } else if (pToken->type == TK_NK_INTEGER) {
        *(int8_t*)(&val->i64) = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
      } else if (pToken->type == TK_NK_FLOAT) {
        *(int8_t*)(&val->i64) = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
D
dapan1121 已提交
857
      }
wmmhello's avatar
wmmhello 已提交
858 859
      break;
    }
860

wmmhello's avatar
wmmhello 已提交
861 862 863 864 865
    case TSDB_DATA_TYPE_TINYINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
D
dapan1121 已提交
866
      }
wmmhello's avatar
wmmhello 已提交
867 868 869

      *(int8_t*)(&val->i64) = iv;
      break;
870 871
    }

wmmhello's avatar
wmmhello 已提交
872 873 874 875 876 877 878 879 880
    case TSDB_DATA_TYPE_UTINYINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
      } else if (!IS_VALID_UTINYINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
      *(uint8_t*)(&val->i64) = uv;
      break;
    }
881

wmmhello's avatar
wmmhello 已提交
882 883 884 885 886 887 888 889 890
    case TSDB_DATA_TYPE_SMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      *(int16_t*)(&val->i64) = iv;
      break;
    }
891

wmmhello's avatar
wmmhello 已提交
892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
    case TSDB_DATA_TYPE_USMALLINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
      } else if (!IS_VALID_USMALLINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
      *(uint16_t*)(&val->i64) = uv;
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      *(int32_t*)(&val->i64) = iv;
      break;
    }

    case TSDB_DATA_TYPE_UINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
      } else if (!IS_VALID_UINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
      *(uint32_t*)(&val->i64) = uv;
      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }

      val->i64 = iv;
      break;
    }

    case TSDB_DATA_TYPE_UBIGINT: {
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
      } else if (!IS_VALID_UBIGINT(uv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
      *(uint64_t*)(&val->i64) = uv;
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      *(float*)(&val->i64) = dv;
      break;
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }

      *(double*)(&val->i64) = dv;
      break;
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
      }
      val->pData = pToken->z;
      val->nData = pToken->n;
      break;
    }

    case TSDB_DATA_TYPE_NCHAR: {
      int32_t output = 0;
981 982
      void*   p = taosMemoryCalloc(1, pToken->n * TSDB_NCHAR_SIZE);
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
983 984
        return TSDB_CODE_OUT_OF_MEMORY;
      }
985
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)(p), pToken->n * TSDB_NCHAR_SIZE, &output)) {
wmmhello's avatar
wmmhello 已提交
986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
        if (errno == E2BIG) {
          taosMemoryFree(p);
          return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
        }
        char buf[512] = {0};
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
        taosMemoryFree(p);
        return buildSyntaxErrMsg(pMsgBuf, buf, pToken->z);
      }
      val->pData = p;
      val->nData = output;
      break;
    }
    case TSDB_DATA_TYPE_TIMESTAMP: {
      if (parseTime(end, pToken, timePrec, &iv, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

      val->i64 = iv;
      break;
    }
  }
X
Xiaoyu Wang 已提交
1008 1009 1010 1011

  return TSDB_CODE_SUCCESS;
}

1012
// pSql -> tag1_value, ...)
wmmhello's avatar
wmmhello 已提交
1013
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
wmmhello's avatar
wmmhello 已提交
1014
  int32_t code = TSDB_CODE_SUCCESS;
1015 1016 1017 1018 1019
  SArray* pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal));
  SToken  sToken;
  bool    isParseBindParam = false;
  bool    isJson = false;
  STag*   pTag = NULL;
1020
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1021
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
1022 1023 1024 1025

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
wmmhello's avatar
wmmhello 已提交
1026 1027
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
        goto end;
D
stmt  
dapan1121 已提交
1028 1029 1030 1031 1032 1033
      }

      continue;
    }

    if (isParseBindParam) {
wmmhello's avatar
wmmhello 已提交
1034 1035
      code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
      goto end;
D
stmt  
dapan1121 已提交
1036
    }
X
Xiaoyu Wang 已提交
1037

X
Xiaoyu Wang 已提交
1038
    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
X
Xiaoyu Wang 已提交
1039
    char*    tmpTokenBuf = taosMemoryCalloc(1, sToken.n);  // todo this can be optimize with parse column
wmmhello's avatar
wmmhello 已提交
1040 1041 1042 1043 1044
    code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg);
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(tmpTokenBuf);
      goto end;
    }
1045
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
1046 1047 1048 1049 1050
      if (sToken.n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        code = buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", sToken.z);
        taosMemoryFree(tmpTokenBuf);
        goto end;
      }
X
Xiaoyu Wang 已提交
1051
      if (isNullStr(&sToken)) {
1052 1053 1054 1055
        code = tTagNew(pTagVals, 1, true, &pTag);
      } else {
        code = parseJsontoTagData(sToken.z, pTagVals, &pTag, &pCxt->msg);
      }
wmmhello's avatar
wmmhello 已提交
1056
      taosMemoryFree(tmpTokenBuf);
1057
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1058 1059
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
1060
      isJson = true;
1061
    } else {
wmmhello's avatar
wmmhello 已提交
1062
      STagVal val = {0};
wmmhello's avatar
wmmhello 已提交
1063
      code = parseTagToken(&pCxt->pSql, &sToken, pTagSchema, precision, &val, &pCxt->msg);
wmmhello's avatar
wmmhello 已提交
1064 1065 1066 1067
      if (TSDB_CODE_SUCCESS != code) {
        taosMemoryFree(tmpTokenBuf);
        goto end;
      }
1068
      if (pTagSchema->type != TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1069 1070 1071 1072
        taosMemoryFree(tmpTokenBuf);
      }
      taosArrayPush(pTagVals, &val);
    }
1073 1074
  }

D
stmt  
dapan1121 已提交
1075
  if (isParseBindParam) {
wmmhello's avatar
wmmhello 已提交
1076 1077
    code = TSDB_CODE_SUCCESS;
    goto end;
D
stmt  
dapan1121 已提交
1078 1079
  }

1080
  if (!isJson && (code = tTagNew(pTagVals, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1081
    goto end;
1082 1083
  }

wmmhello's avatar
wmmhello 已提交
1084 1085 1086 1087
  buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid);

end:
  for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
1088 1089
    STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
    if (IS_VAR_DATA_TYPE(p->type)) {
wmmhello's avatar
wmmhello 已提交
1090 1091 1092 1093 1094
      taosMemoryFree(p->pData);
    }
  }
  taosArrayDestroy(pTagVals);
  return code;
X
Xiaoyu Wang 已提交
1095
}
1096

X
Xiaoyu Wang 已提交
1097 1098
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
                              int32_t len, STableMeta* pMeta) {
1099 1100
  SVgroupInfo vg;
  CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
X
Xiaoyu Wang 已提交
1101 1102
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));

D
dapan 已提交
1103
  pMeta->uid = 0;
X
Xiaoyu Wang 已提交
1104
  pMeta->vgId = vg.vgId;
D
dapan 已提交
1105
  pMeta->tableType = TSDB_CHILD_TABLE;
X
Xiaoyu Wang 已提交
1106

X
Xiaoyu Wang 已提交
1107 1108 1109 1110 1111
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
1112 1113
}

1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
static int32_t skipParentheses(SInsertParseSyntaxCxt* pCxt) {
  SToken  sToken;
  int32_t expectRightParenthesis = 1;
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_LP == sToken.type) {
      ++expectRightParenthesis;
    } else if (TK_NK_RP == sToken.type && 0 == --expectRightParenthesis) {
      break;
    }
    if (0 == sToken.n) {
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t skipBoundColumns(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }

static int32_t ignoreBoundColumns(SInsertParseContext* pCxt) {
  SInsertParseSyntaxCxt cxt = {.pComCxt = pCxt->pComCxt, .pSql = pCxt->pSql, .msg = pCxt->msg, .pMetaCache = NULL};
  int32_t               code = skipBoundColumns(&cxt);
  pCxt->pSql = cxt.pSql;
  return code;
}

1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt);

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t ignoreAutoCreateTableClause(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  SInsertParseSyntaxCxt cxt = {.pComCxt = pCxt->pComCxt, .pSql = pCxt->pSql, .msg = pCxt->msg, .pMetaCache = NULL};
  int32_t               code = skipUsingClause(&cxt);
  pCxt->pSql = cxt.pSql;
  return code;
}

1152
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
D
dapan 已提交
1153
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
H
refact  
Hongze Cheng 已提交
1154
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
1155 1156
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
1157
    CHECK_CODE(ignoreAutoCreateTableClause(pCxt));
X
Xiaoyu Wang 已提交
1158 1159
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
1160

X
Xiaoyu Wang 已提交
1161
  SToken sToken;
1162 1163
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1164 1165 1166

  SName sname;
  createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
X
Xiaoyu Wang 已提交
1167 1168
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(&sname, dbFName);
X
Xiaoyu Wang 已提交
1169

X
Xiaoyu Wang 已提交
1170
  CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName));
1171 1172 1173
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
D
dapan 已提交
1174
  CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
1175 1176

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
1177
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
1178 1179 1180

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
1181
  if (TK_NK_LP == sToken.type) {
1182
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
1183 1184 1185 1186 1187 1188 1189 1190
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
1191
  if (TK_NK_LP != sToken.type) {
1192 1193
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1194
  CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
1195 1196 1197 1198
  NEXT_VALID_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_COMMA == sToken.type) {
    return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
  } else if (TK_NK_RP != sToken.type) {
X
Xiaoyu Wang 已提交
1199 1200
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
1201 1202 1203 1204

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1205 1206
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow,
                       char* tmpTokenBuf) {
1207
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
1208 1209 1210 1211
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
1212

H
refact  
Hongze Cheng 已提交
1213 1214
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
1215
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
1216
  SToken    sToken = {0};
1217 1218
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1219
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
X
Xiaoyu Wang 已提交
1220
    SSchema* pSchema = &schema[spd->boundColumns[i]];
D
stmt  
dapan1121 已提交
1221 1222 1223 1224 1225 1226 1227 1228 1229 1230

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

D
dapan1121 已提交
1231 1232 1233 1234
    if (TK_NK_RP == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    }

D
stmt  
dapan1121 已提交
1235 1236 1237
    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
X
Xiaoyu Wang 已提交
1238

1239
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
1240
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
1241
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
1242 1243

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
1244
      TSKEY tsKey = TD_ROW_KEY(row);
1245
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
1246 1247 1248 1249
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
1250
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
1251
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
1252
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
1253 1254 1255
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
1256 1257 1258
        }
      }
    }
D
stmt  
dapan1121 已提交
1259 1260

    *gotRow = true;
C
Cary Xu 已提交
1261 1262
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
C
Cary Xu 已提交
1263
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1264 1265
    taosMemoryFree(pSTSchema);
#endif
1266 1267
  }

C
Cary Xu 已提交
1268
  // *len = pBuilder->extendedRowSize;
1269 1270 1271 1272
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
1273
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
1274
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
1275
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
1276
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
1277 1278

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
1279
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
1280 1281
  SToken sToken;
  while (1) {
1282 1283
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
1284
    if (TK_NK_LP != sToken.type) {
1285 1286
      break;
    }
1287
    pCxt->pSql += index;
1288 1289 1290 1291 1292 1293 1294 1295

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
1296 1297 1298
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
X
Xiaoyu Wang 已提交
1299
      pDataBlock->size += extendedRowSize;  // len;
D
stmt  
dapan1121 已提交
1300
    }
1301

1302 1303 1304 1305
    NEXT_VALID_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_COMMA == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    } else if (TK_NK_RP != sToken.type) {
1306 1307 1308
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
1309 1310 1311
    if (gotRow) {
      (*numOfRows)++;
    }
1312 1313
  }

D
stmt  
dapan1121 已提交
1314
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
X
Xiaoyu Wang 已提交
1315
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
1316 1317 1318 1319
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
1320
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
1321 1322 1323 1324
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
1325
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
1326

H
refact  
Hongze Cheng 已提交
1327
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1328
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1329 1330 1331 1332 1333 1334 1335 1336
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404
static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataBlocks* pDataBlock, int maxRows,
                            int32_t* numOfRows) {
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

  (*numOfRows) = 0;
  char    tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
  char*   pLine = NULL;
  int64_t readLen = 0;
  while ((readLen = taosGetLineFile(fp, &pLine)) != -1) {
    if (('\r' == pLine[readLen - 1]) || ('\n' == pLine[readLen - 1])) {
      pLine[--readLen] = '\0';
    }

    if (readLen == 0) {
      continue;
    }

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

    strtolower(pLine, pLine);
    char* pRawSql = pCxt->pSql;
    pCxt->pSql = pLine;
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
      pDataBlock->size += extendedRowSize;  // len;
      (*numOfRows)++;
    }
    pCxt->pSql = pRawSql;
  }

  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STableDataBlocks* dataBuf) {
  char filePathStr[TSDB_FILENAME_LEN] = {0};
  strncpy(filePathStr, filePath.z, filePath.n);
  TdFilePtr fp = taosOpenFile(filePathStr, TD_FILE_READ | TD_FILE_STREAM);
  if (NULL == fp) {
    return TAOS_SYSTEM_ERROR(errno);
  }

  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
  CHECK_CODE(parseCsvFile(pCxt, fp, dataBuf, maxNumOfRows, &numOfRows));

  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1405
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1406
  taosMemoryFreeClear(pReq->name);
H
Hongze Cheng 已提交
1407
  taosMemoryFreeClear(pReq->ctb.pTag);
X
Xiaoyu Wang 已提交
1408 1409
}

X
Xiaoyu Wang 已提交
1410
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1411
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1412
  destroyBoundColumnInfo(&pCxt->tags);
X
Xiaoyu Wang 已提交
1413
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1414 1415
}

1416 1417
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }

X
Xiaoyu Wang 已提交
1418 1419 1420
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1421
  taosHashCleanup(pCxt->pSubTableHashObj);
D
dapan1121 已提交
1422
  taosHashCleanup(pCxt->pTableNameHashObj);
D
dapan1121 已提交
1423
  taosHashCleanup(pCxt->pDbFNameHashObj);
1424 1425

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1426 1427 1428
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1429 1430 1431 1432 1433 1434
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
1435 1436 1437
  int32_t tbNum = 0;
  char    tbFName[TSDB_TABLE_FNAME_LEN];
  bool    autoCreateTbl = false;
X
Xiaoyu Wang 已提交
1438

X
Xiaoyu Wang 已提交
1439
  // for each table
1440 1441
  while (1) {
    SToken sToken;
X
Xiaoyu Wang 已提交
1442
    char*  tbName = NULL;
D
stmt  
dapan1121 已提交
1443

1444 1445 1446 1447 1448
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
dapan1121 已提交
1449 1450 1451
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }
X
Xiaoyu Wang 已提交
1452

D
stmt  
dapan1121 已提交
1453
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1454
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1455 1456 1457 1458
      }
      break;
    }

D
stmt  
dapan1121 已提交
1459
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
X
Xiaoyu Wang 已提交
1460
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
D
stmt  
dapan1121 已提交
1461 1462
    }

D
stmt  
dapan1121 已提交
1463 1464
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1465 1466 1467
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
X
Xiaoyu Wang 已提交
1468

D
stmt  
dapan1121 已提交
1469 1470 1471 1472 1473 1474
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
X
Xiaoyu Wang 已提交
1475

1476 1477 1478
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

D
dapan 已提交
1479
    SName name;
1480
    CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
D
dapan 已提交
1481

1482
    tNameExtractFullName(&name, tbFName);
X
Xiaoyu Wang 已提交
1483
    CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
D
dapan1121 已提交
1484 1485 1486
    char dbFName[TSDB_DB_FNAME_LEN];
    tNameGetFullDbName(&name, dbFName);
    CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
X
Xiaoyu Wang 已提交
1487

1488
    bool existedUsing = false;
1489
    // USING clause
1490
    if (TK_USING == sToken.type) {
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
      existedUsing = true;
      CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
      NEXT_TOKEN(pCxt->pSql, sToken);
      autoCreateTbl = true;
    }

    char* pBoundColsStart = NULL;
    if (TK_NK_LP == sToken.type) {
      // pSql -> field1_name, ...)
      pBoundColsStart = pCxt->pSql;
      CHECK_CODE(ignoreBoundColumns(pCxt));
      // CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

1506
    if (TK_USING == sToken.type) {
D
dapan 已提交
1507
      CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
1508
      NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1509
      autoCreateTbl = true;
1510
    } else if (!existedUsing) {
D
dapan1121 已提交
1511
      CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
1512 1513
    }

H
refact  
Hongze Cheng 已提交
1514
    STableDataBlocks* dataBuf = NULL;
D
dapan 已提交
1515
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1516 1517
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));
1518

1519 1520 1521
    if (NULL != pBoundColsStart) {
      char* pCurrPos = pCxt->pSql;
      pCxt->pSql = pBoundColsStart;
D
dapan1121 已提交
1522
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
1523
      pCxt->pSql = pCurrPos;
1524 1525 1526 1527 1528
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1529
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1530 1531

      tbNum++;
1532 1533 1534 1535
      continue;
    }

    // FILE csv_file_path
X
Xiaoyu Wang 已提交
1536
    if (TK_FILE == sToken.type) {
1537 1538
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1539
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1540 1541
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
X
Xiaoyu Wang 已提交
1542
      CHECK_CODE(parseDataFromFile(pCxt, sToken, dataBuf));
1543
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1544 1545

      tbNum++;
1546 1547 1548 1549 1550
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1551

D
stmt  
dapan1121 已提交
1552
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
X
Xiaoyu Wang 已提交
1553
    SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1554 1555 1556 1557
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
1558 1559
    (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
                                pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1560

D
dapan 已提交
1561
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1562 1563
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
D
dapan1121 已提交
1564
    pCxt->pTableMeta = NULL;
X
Xiaoyu Wang 已提交
1565

D
stmt  
dapan1121 已提交
1566 1567
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1568

1569
  // merge according to vgId
D
stmt  
dapan1121 已提交
1570
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1571
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1572
  }
1573
  return buildOutput(pCxt);
1574 1575 1576 1577 1578 1579 1580 1581
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
1582
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
1583
  SInsertParseContext context = {
X
Xiaoyu Wang 已提交
1584 1585 1586 1587
      .pComCxt = pContext,
      .pSql = (char*)pContext->pSql,
      .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
      .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1588 1589
      .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
      .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
D
dapan1121 已提交
1590
      .pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
X
Xiaoyu Wang 已提交
1591 1592
      .totalNum = 0,
      .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
1593 1594
      .pStmtCb = pContext->pStmtCb,
      .pMetaCache = pMetaCache};
1595

D
stmt  
dapan1121 已提交
1596
  if (pContext->pStmtCb && *pQuery) {
X
Xiaoyu Wang 已提交
1597 1598
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
                                        &context.pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1599
  } else {
X
Xiaoyu Wang 已提交
1600 1601 1602
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
    context.pTableBlockHashObj =
        taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
D
stmt  
dapan1121 已提交
1603
  }
X
Xiaoyu Wang 已提交
1604 1605

  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
D
dapan1121 已提交
1606
      NULL == context.pTableNameHashObj || NULL == context.pDbFNameHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1607
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1608
  }
1609
  taosHashSetFreeFp(context.pSubTableHashObj, destroySubTableHashElem);
1610

D
stmt  
dapan1121 已提交
1611 1612 1613 1614
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1615
  if (NULL == *pQuery) {
1616
    *pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
D
stmt  
dapan1121 已提交
1617 1618 1619
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1620
  }
1621 1622 1623 1624
  (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
  (*pQuery)->haveResultSet = false;
  (*pQuery)->msgType = TDMT_VND_SUBMIT;
  (*pQuery)->pRoot = (SNode*)context.pOutput;
X
Xiaoyu Wang 已提交
1625

D
dapan1121 已提交
1626 1627 1628 1629 1630 1631
  if (NULL == (*pQuery)->pTableList) {
    (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
    if (NULL == (*pQuery)->pTableList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
1632

D
dapan1121 已提交
1633 1634 1635 1636 1637 1638 1639
  if (NULL == (*pQuery)->pDbList) {
    (*pQuery)->pDbList = taosArrayInit(taosHashGetSize(context.pDbFNameHashObj), TSDB_DB_FNAME_LEN);
    if (NULL == (*pQuery)->pDbList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

1640
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1641

1642
  int32_t code = skipInsertInto(&context.pSql, &context.msg);
1643 1644 1645
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
1646
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
X
Xiaoyu Wang 已提交
1647 1648 1649 1650 1651
    SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
    while (NULL != pTable) {
      taosArrayPush((*pQuery)->pTableList, pTable);
      pTable = taosHashIterate(context.pTableNameHashObj, pTable);
    }
D
dapan1121 已提交
1652 1653 1654 1655 1656 1657

    char* pDb = taosHashIterate(context.pDbFNameHashObj, NULL);
    while (NULL != pDb) {
      taosArrayPush((*pQuery)->pDbList, pDb);
      pDb = taosHashIterate(context.pDbFNameHashObj, pDb);
    }
X
Xiaoyu Wang 已提交
1658
  }
1659
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1660
  return code;
1661
}
D
stmt  
dapan1121 已提交
1662

1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t skipValuesClause(SInsertParseSyntaxCxt* pCxt) {
  int32_t numOfRows = 0;
  SToken  sToken;
  while (1) {
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
    if (TK_NK_LP != sToken.type) {
      break;
    }
    pCxt->pSql += index;

    CHECK_CODE(skipParentheses(pCxt));
    ++numOfRows;
  }
  if (0 == numOfRows) {
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t skipTagsClause(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }

// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_LP == sToken.type) {
    CHECK_CODE(skipBoundColumns(pCxt));
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_LP != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
  CHECK_CODE(skipTagsClause(pCxt));

  return TSDB_CODE_SUCCESS;
}

static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
  SName name;
  CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
X
Xiaoyu Wang 已提交
1711
  CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache));
1712 1713 1714 1715 1716 1717
  CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
  CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
  CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1718 1719 1720 1721 1722 1723 1724
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
  SName name;
  CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
  CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
  return TSDB_CODE_SUCCESS;
}

1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
  bool hasData = false;
  // for each table
  while (1) {
    SToken sToken;

    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }

      if (!hasData) {
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
      }
      break;
    }

    hasData = false;

    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

1751
    bool existedUsing = false;
1752 1753
    // USING clause
    if (TK_USING == sToken.type) {
1754
      existedUsing = true;
X
Xiaoyu Wang 已提交
1755
      CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767
      NEXT_TOKEN(pCxt->pSql, sToken);
      CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
      CHECK_CODE(skipUsingClause(pCxt));
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_NK_LP == sToken.type) {
      // pSql -> field1_name, ...)
      CHECK_CODE(skipBoundColumns(pCxt));
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778
    if (TK_USING == sToken.type && !existedUsing) {
      existedUsing = true;
      CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
      NEXT_TOKEN(pCxt->pSql, sToken);
      CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
      CHECK_CODE(skipUsingClause(pCxt));
      NEXT_TOKEN(pCxt->pSql, sToken);
    } else {
      CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
    }

1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802
    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(skipValuesClause(pCxt));
      hasData = true;
      continue;
    }

    // FILE csv_file_path
    if (TK_FILE == sToken.type) {
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      hasData = true;
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }

  return TSDB_CODE_SUCCESS;
}

1803
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
1804 1805 1806
  SInsertParseSyntaxCxt context = {.pComCxt = pContext,
                                   .pSql = (char*)pContext->pSql,
                                   .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
1807 1808
                                   .pMetaCache = pMetaCache};
  int32_t               code = skipInsertInto(&context.pSql, &context.msg);
1809 1810 1811 1812
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBodySyntax(&context);
  }
  if (TSDB_CODE_SUCCESS == code) {
1813
    *pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
1814 1815 1816 1817 1818 1819 1820
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  return code;
}

X
Xiaoyu Wang 已提交
1821 1822 1823 1824
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
D
stmt  
dapan1121 已提交
1825
  int32_t code = 0;
X
Xiaoyu Wang 已提交
1826 1827
  char*   tbName = NULL;

D
stmt  
dapan1121 已提交
1828
  NEXT_TOKEN(pTableName, sToken);
X
Xiaoyu Wang 已提交
1829

D
stmt  
dapan1121 已提交
1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1841
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1842 1843 1844 1845 1846 1847 1848
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
1849 1850
  SVnodeModifOpStmt*  modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
  int32_t             code = 0;
D
stmt  
dapan1121 已提交
1851
  SInsertParseContext insertCtx = {
X
Xiaoyu Wang 已提交
1852 1853 1854
      .pVgroupsHashObj = pVgHash,
      .pTableBlockHashObj = pBlockHash,
      .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1855
  };
X
Xiaoyu Wang 已提交
1856

D
stmt  
dapan1121 已提交
1857 1858
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1859
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1860 1861 1862 1863
  }

  CHECK_CODE(buildOutput(&insertCtx));

wmmhello's avatar
wmmhello 已提交
1864
  destroyBlockArrayList(insertCtx.pVgDataBlocks);
D
stmt  
dapan1121 已提交
1865 1866 1867
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1868 1869 1870 1871
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
                           char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
stmt  
dapan1121 已提交
1872 1873 1874 1875 1876
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

C
Cary Xu 已提交
1877 1878
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
C
Cary Xu 已提交
1879
    return buildInvalidOperationMsg(&pBuf, "out of memory");
D
stmt  
dapan1121 已提交
1880 1881
  }

1882
  int32_t  code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1883
  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1884

1885
  bool  isJson = false;
wmmhello's avatar
wmmhello 已提交
1886
  STag* pTag = NULL;
D
dapan1121 已提交
1887

D
stmt  
dapan1121 已提交
1888 1889 1890 1891
  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      continue;
    }
X
Xiaoyu Wang 已提交
1892

X
Xiaoyu Wang 已提交
1893
    SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
1894
    int32_t  colLen = pTagSchema->bytes;
D
stmt  
dapan1121 已提交
1895 1896 1897
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
wmmhello's avatar
wmmhello 已提交
1898 1899 1900 1901 1902
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
      if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
        goto end;
      }
X
Xiaoyu Wang 已提交
1903

wmmhello's avatar
wmmhello 已提交
1904
      isJson = true;
1905
      char* tmp = taosMemoryCalloc(1, colLen + 1);
wmmhello's avatar
wmmhello 已提交
1906
      memcpy(tmp, bind[c].buffer, colLen);
wmmhello's avatar
wmmhello 已提交
1907
      code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
wmmhello's avatar
wmmhello 已提交
1908
      taosMemoryFree(tmp);
1909
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1910 1911
        goto end;
      }
1912
    } else {
wmmhello's avatar
wmmhello 已提交
1913
      STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
1914
      if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1915 1916
        val.pData = (uint8_t*)bind[c].buffer;
        val.nData = colLen;
1917
      } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
1918
        int32_t output = 0;
1919 1920
        void*   p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
        if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
1921 1922 1923
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
1924
        if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
wmmhello's avatar
wmmhello 已提交
1925 1926
          if (errno == E2BIG) {
            taosMemoryFree(p);
wmmhello's avatar
wmmhello 已提交
1927
            code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
wmmhello's avatar
wmmhello 已提交
1928 1929 1930 1931
            goto end;
          }
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
wmmhello's avatar
wmmhello 已提交
1932
          taosMemoryFree(p);
wmmhello's avatar
wmmhello 已提交
1933
          code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
wmmhello's avatar
wmmhello 已提交
1934 1935
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
1936 1937
        val.pData = p;
        val.nData = output;
1938
      } else {
wmmhello's avatar
wmmhello 已提交
1939
        memcpy(&val.i64, bind[c].buffer, colLen);
wmmhello's avatar
wmmhello 已提交
1940
      }
wmmhello's avatar
wmmhello 已提交
1941
      taosArrayPush(pTagArray, &val);
wmmhello's avatar
wmmhello 已提交
1942
    }
D
stmt  
dapan1121 已提交
1943 1944
  }

wmmhello's avatar
wmmhello 已提交
1945
  if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1946
    goto end;
D
stmt  
dapan1121 已提交
1947 1948 1949
  }

  SVCreateTbReq tbReq = {0};
wmmhello's avatar
wmmhello 已提交
1950 1951
  buildCreateTbReq(&tbReq, tName, pTag, suid);
  code = buildCreateTbMsg(pDataBlock, &tbReq);
D
stmt  
dapan1121 已提交
1952 1953
  destroyCreateSubTbReq(&tbReq);

wmmhello's avatar
wmmhello 已提交
1954 1955
end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
1956 1957
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
1958 1959 1960
      taosMemoryFree(p->pData);
    }
  }
C
Cary Xu 已提交
1961
  taosArrayDestroy(pTagArray);
D
stmt  
dapan1121 已提交
1962

wmmhello's avatar
wmmhello 已提交
1963
  return code;
D
stmt  
dapan1121 已提交
1964 1965
}

X
Xiaoyu Wang 已提交
1966 1967 1968 1969
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1970 1971
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1972 1973 1974 1975
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
stmt  
dapan1121 已提交
1976 1977
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1978
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
X
Xiaoyu Wang 已提交
1979

D
stmt  
dapan1121 已提交
1980 1981 1982
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
X
Xiaoyu Wang 已提交
1983

D
stmt  
dapan1121 已提交
1984
    for (int c = 0; c < spd->numOfBound; ++c) {
X
Xiaoyu Wang 已提交
1985
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
D
stmt  
dapan1121 已提交
1986 1987 1988 1989

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
X
Xiaoyu Wang 已提交
1990

D
stmt  
dapan1121 已提交
1991 1992 1993 1994
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1995 1996 1997
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
X
Xiaoyu Wang 已提交
1998

D
stmt  
dapan1121 已提交
1999 2000
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
D
dapan1121 已提交
2001 2002 2003 2004
        if (bind[c].buffer_type != pColSchema->type) {
          return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
        }

D
stmt  
dapan1121 已提交
2005 2006 2007 2008
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
X
Xiaoyu Wang 已提交
2009 2010

        CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
2011
      }
X
Xiaoyu Wang 已提交
2012

D
stmt  
dapan1121 已提交
2013 2014
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
2015
        checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026
      }
    }
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
C
Cary Xu 已提交
2027 2028
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
2029
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
2030 2031
    taosMemoryFree(pSTSchema);
#endif
D
stmt  
dapan1121 已提交
2032 2033
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
2034

X
Xiaoyu Wang 已提交
2035
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
2036 2037 2038 2039 2040 2041 2042
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2043 2044 2045 2046 2047
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
2048 2049
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
2050 2051 2052 2053
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  bool                rowStart = (0 == colIdx);
  bool                rowEnd = ((colIdx + 1) == spd->numOfBound);
D
stmt  
dapan1121 已提交
2054 2055 2056 2057 2058

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
X
Xiaoyu Wang 已提交
2059

D
stmt  
dapan1121 已提交
2060 2061 2062 2063 2064 2065 2066
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
D
stmt  
dapan1121 已提交
2067

X
Xiaoyu Wang 已提交
2068
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]];
D
stmt  
dapan1121 已提交
2069 2070 2071 2072

    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
X
Xiaoyu Wang 已提交
2073

D
stmt  
dapan1121 已提交
2074 2075 2076 2077 2078 2079 2080
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
X
Xiaoyu Wang 已提交
2081

D
stmt  
dapan1121 已提交
2082 2083
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
2084 2085 2086 2087
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
2088 2089 2090 2091
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
X
Xiaoyu Wang 已提交
2092 2093

      CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
2094
    }
X
Xiaoyu Wang 已提交
2095

D
stmt  
dapan1121 已提交
2096 2097
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
2098
      checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
2099
    }
X
Xiaoyu Wang 已提交
2100

D
stmt  
dapan1121 已提交
2101 2102 2103 2104 2105 2106 2107 2108
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
X
Xiaoyu Wang 已提交
2109
    }
C
Cary Xu 已提交
2110 2111

#ifdef TD_DEBUG_PRINT_ROW
X
Xiaoyu Wang 已提交
2112
    if (rowEnd) {
C
Cary Xu 已提交
2113
      STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
2114
      tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
2115 2116 2117
      taosMemoryFree(pSTSchema);
    }
#endif
D
stmt  
dapan1121 已提交
2118 2119
  }

D
stmt  
dapan1121 已提交
2120 2121 2122
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

X
Xiaoyu Wang 已提交
2123
    SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
2124 2125 2126 2127 2128 2129 2130 2131
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}

2132 2133
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
                         uint8_t timePrec) {
D
stmt  
dapan1121 已提交
2134 2135 2136 2137 2138 2139
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

D
dapan1121 已提交
2140 2141
    SSchema* schema = &pSchema[boundInfo->boundColumns[0]];
    if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
D
dapan1121 已提交
2142 2143
      (*fields)[0].precision = timePrec;
    }
2144

D
stmt  
dapan1121 已提交
2145
    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
D
dapan1121 已提交
2146 2147 2148 2149
      schema = &pSchema[boundInfo->boundColumns[i]];
      strcpy((*fields)[i].name, schema->name);
      (*fields)[i].type = schema->type;
      (*fields)[i].bytes = schema->bytes;
D
stmt  
dapan1121 已提交
2150
    }
D
stmt  
dapan1121 已提交
2151 2152 2153 2154 2155 2156 2157
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2158
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
X
Xiaoyu Wang 已提交
2159
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
2160 2161 2162 2163
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
X
Xiaoyu Wang 已提交
2164

D
dapan1121 已提交
2165 2166 2167 2168
  if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) {
    return TSDB_CODE_TSC_STMT_API_ERROR;
  }

X
Xiaoyu Wang 已提交
2169
  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
2170 2171 2172 2173 2174 2175 2176
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
2177
  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0));
X
Xiaoyu Wang 已提交
2178

D
stmt  
dapan1121 已提交
2179 2180 2181
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2182
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
X
Xiaoyu Wang 已提交
2183 2184
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*          pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
2185 2186
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
2187 2188 2189
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
2190 2191 2192 2193

    return TSDB_CODE_SUCCESS;
  }

2194 2195
  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields,
                              pDataBlock->pTableMeta->tableInfo.precision));
X
Xiaoyu Wang 已提交
2196

D
stmt  
dapan1121 已提交
2197 2198 2199
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2200
// schemaless logic start
D
stmt  
dapan1121 已提交
2201

wmmhello's avatar
wmmhello 已提交
2202
typedef struct SmlExecTableHandle {
X
Xiaoyu Wang 已提交
2203 2204
  SParsedDataColInfo tags;          // each table
  SVCreateTbReq      createTblReq;  // each table
wmmhello's avatar
wmmhello 已提交
2205
} SmlExecTableHandle;
wmmhello's avatar
wmmhello 已提交
2206

wmmhello's avatar
wmmhello 已提交
2207
typedef struct SmlExecHandle {
2208 2209 2210
  SHashObj*          pBlockHash;
  SmlExecTableHandle tableExecHandle;
  SQuery*            pQuery;
wmmhello's avatar
wmmhello 已提交
2211
} SSmlExecHandle;
wmmhello's avatar
wmmhello 已提交
2212

wmmhello's avatar
wmmhello 已提交
2213 2214 2215 2216 2217 2218
static void smlDestroyTableHandle(void* pHandle) {
  SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
  destroyBoundColumnInfo(&handle->tags);
  destroyCreateSubTbReq(&handle->createTblReq);
}

X
Xiaoyu Wang 已提交
2219
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
wmmhello's avatar
wmmhello 已提交
2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231
  col_id_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0;
  pColList->boundNullLen = 0;
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
  for (col_id_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  bool     isOrdered = true;
  col_id_t lastColIdx = -1;  // last column found
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
X
Xiaoyu Wang 已提交
2232 2233
    SSmlKv*  kv = taosArrayGetP(cols, i);
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
wmmhello's avatar
wmmhello 已提交
2234 2235 2236 2237 2238 2239 2240
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
2241
      uError("smlBoundColumnData. index:%d", index);
wmmhello's avatar
wmmhello 已提交
2242 2243 2244
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
2245
      uError("smlBoundColumnData. already set. index:%d", index);
wmmhello's avatar
wmmhello 已提交
2246 2247 2248 2249
      return TSDB_CODE_SML_INVALID_DATA;
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
2250
    pColList->boundColumns[pColList->numOfBound] = index;
wmmhello's avatar
wmmhello 已提交
2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276
    ++pColList->numOfBound;
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
      pColIdx[i].boundIdx = i;
    }
wafwerar's avatar
wafwerar 已提交
2277
    taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
wmmhello's avatar
wmmhello 已提交
2278 2279 2280
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
wafwerar's avatar
wafwerar 已提交
2281
    taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
wmmhello's avatar
wmmhello 已提交
2282 2283
  }

X
Xiaoyu Wang 已提交
2284
  if (pColList->numOfCols > pColList->numOfBound) {
wmmhello's avatar
wmmhello 已提交
2285 2286 2287 2288 2289 2290 2291
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302
/**
 * @brief No json tag for schemaless
 *
 * @param cols
 * @param tags
 * @param pSchema
 * @param ppTag
 * @param msg
 * @return int32_t
 */
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SMsgBuf* msg) {
C
Cary Xu 已提交
2303 2304
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
wmmhello's avatar
wmmhello 已提交
2305 2306 2307
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

wmmhello's avatar
wmmhello 已提交
2308
  int32_t code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2309
  for (int i = 0; i < tags->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
2310
    SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
2311
    SSmlKv*  kv = taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
2312

wmmhello's avatar
wmmhello 已提交
2313
    STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
2314 2315
    if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
      val.pData = (uint8_t*)kv->value;
wmmhello's avatar
wmmhello 已提交
2316
      val.nData = kv->length;
2317
    } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
2318
      int32_t output = 0;
X
Xiaoyu Wang 已提交
2319 2320
      void*   p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
2321 2322 2323
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
2324
      if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) {
wmmhello's avatar
wmmhello 已提交
2325 2326
        if (errno == E2BIG) {
          taosMemoryFree(p);
wmmhello's avatar
wmmhello 已提交
2327
          code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
wmmhello's avatar
wmmhello 已提交
2328 2329 2330 2331 2332 2333 2334 2335 2336 2337
          goto end;
        }
        char buf[512] = {0};
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
        taosMemoryFree(p);
        code = buildSyntaxErrMsg(msg, buf, kv->value);
        goto end;
      }
      val.pData = p;
      val.nData = output;
2338
    } else {
wmmhello's avatar
wmmhello 已提交
2339
      memcpy(&val.i64, &(kv->value), kv->length);
wmmhello's avatar
wmmhello 已提交
2340
    }
wmmhello's avatar
wmmhello 已提交
2341
    taosArrayPush(pTagArray, &val);
wmmhello's avatar
wmmhello 已提交
2342 2343
  }

wmmhello's avatar
wmmhello 已提交
2344 2345 2346
  code = tTagNew(pTagArray, 1, false, ppTag);
end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
2347 2348
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
2349 2350
      taosMemoryFree(p->pData);
    }
wmmhello's avatar
wmmhello 已提交
2351
  }
C
Cary Xu 已提交
2352
  taosArrayDestroy(pTagArray);
wmmhello's avatar
wmmhello 已提交
2353
  return code;
wmmhello's avatar
wmmhello 已提交
2354 2355
}

2356 2357
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
                    char* tableName, char* msgBuf, int16_t msgBufLen) {
wmmhello's avatar
wmmhello 已提交
2358 2359
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

X
Xiaoyu Wang 已提交
2360
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
2361 2362
  smlDestroyTableHandle(&smlHandle->tableExecHandle);  // free for each table
  SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
wmmhello's avatar
wmmhello 已提交
2363 2364
  setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
  int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
X
Xiaoyu Wang 已提交
2365
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2366 2367 2368
    buildInvalidOperationMsg(&pBuf, "bound tags error");
    return ret;
  }
C
Cary Xu 已提交
2369 2370
  STag* pTag = NULL;
  ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &pBuf);
X
Xiaoyu Wang 已提交
2371
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2372 2373
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2374

C
Cary Xu 已提交
2375
  buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid);
wmmhello's avatar
wmmhello 已提交
2376

wmmhello's avatar
wmmhello 已提交
2377
  STableDataBlocks* pDataBlock = NULL;
X
Xiaoyu Wang 已提交
2378 2379
  ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
                             TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
wmmhello's avatar
wmmhello 已提交
2380
                             pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
X
Xiaoyu Wang 已提交
2381
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2382 2383 2384
    buildInvalidOperationMsg(&pBuf, "create data block error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2385 2386 2387

  SSchema* pSchema = getTableColumnSchema(pTableMeta);

2388
  ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
X
Xiaoyu Wang 已提交
2389
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2390 2391 2392
    buildInvalidOperationMsg(&pBuf, "bound cols error");
    return ret;
  }
X
Xiaoyu Wang 已提交
2393
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
wmmhello's avatar
wmmhello 已提交
2394 2395
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
2396
  SMemParam           param = {.rb = pBuilder};
wmmhello's avatar
wmmhello 已提交
2397 2398 2399

  initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);

2400
  int32_t rowNum = taosArrayGetSize(cols);
2401
  if (rowNum <= 0) {
wmmhello's avatar
wmmhello 已提交
2402 2403
    return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
  }
wmmhello's avatar
wmmhello 已提交
2404
  ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
X
Xiaoyu Wang 已提交
2405
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2406 2407 2408
    buildInvalidOperationMsg(&pBuf, "allocate memory error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2409 2410 2411
  for (int32_t r = 0; r < rowNum; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
2412
    void*  rowData = taosArrayGetP(cols, r);
2413
    size_t rowDataSize = 0;
2414
    if (format) {
2415
      rowDataSize = taosArrayGetSize(rowData);
wmmhello's avatar
wmmhello 已提交
2416
    }
wmmhello's avatar
wmmhello 已提交
2417 2418

    // 1. set the parsed value from sql string
2419
    for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
2420
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
wmmhello's avatar
wmmhello 已提交
2421 2422 2423 2424

      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

X
Xiaoyu Wang 已提交
2425 2426 2427
      SSmlKv* kv = NULL;
      if (format) {
        if (j < rowDataSize) {
2428
          kv = taosArrayGetP(rowData, j);
X
Xiaoyu Wang 已提交
2429 2430
          if (rowDataSize != spd->numOfBound &&
              (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
2431
            kv = NULL;
X
Xiaoyu Wang 已提交
2432
          } else {
2433
            j++;
2434
          }
wmmhello's avatar
wmmhello 已提交
2435
        }
X
Xiaoyu Wang 已提交
2436 2437 2438
      } else {
        void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
        if (p) kv = *p;
wmmhello's avatar
wmmhello 已提交
2439
      }
wmmhello's avatar
wmmhello 已提交
2440

2441
      if (!kv || kv->length == 0) {
wmmhello's avatar
wmmhello 已提交
2442 2443
        MemRowAppend(&pBuf, NULL, 0, &param);
      } else {
wmmhello's avatar
wmmhello 已提交
2444 2445
        int32_t colLen = kv->length;
        if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
2446
          kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
wmmhello's avatar
wmmhello 已提交
2447 2448
        }

2449
        if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
2450
          MemRowAppend(&pBuf, kv->value, colLen, &param);
2451
        } else {
wmmhello's avatar
wmmhello 已提交
2452 2453
          MemRowAppend(&pBuf, &(kv->value), colLen, &param);
        }
wmmhello's avatar
wmmhello 已提交
2454 2455 2456 2457
      }

      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
2458
        checkTimestamp(pDataBlock, (const char*)&tsKey);
wmmhello's avatar
wmmhello 已提交
2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474
      }
    }

    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }

    pDataBlock->size += extendedRowSize;
  }

X
Xiaoyu Wang 已提交
2475
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
wmmhello's avatar
wmmhello 已提交
2476 2477 2478 2479 2480 2481 2482
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2483 2484 2485
void* smlInitHandle(SQuery* pQuery) {
  SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
  if (!handle) return NULL;
wmmhello's avatar
wmmhello 已提交
2486 2487 2488 2489 2490 2491
  handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
  handle->pQuery = pQuery;

  return handle;
}

X
Xiaoyu Wang 已提交
2492 2493 2494
void smlDestroyHandle(void* pHandle) {
  if (!pHandle) return;
  SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
wmmhello's avatar
wmmhello 已提交
2495
  destroyBlockHashmap(handle->pBlockHash);
wmmhello's avatar
wmmhello 已提交
2496
  smlDestroyTableHandle(&handle->tableExecHandle);
wmmhello's avatar
wmmhello 已提交
2497 2498 2499 2500
  taosMemoryFree(handle);
}

int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
2501
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
wmmhello's avatar
wmmhello 已提交
2502 2503
  return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
wmmhello's avatar
wmmhello 已提交
2504
// schemaless logic end