parInsert.c 65.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43
  } while (0)

44 45 46 47 48 49 50
#define NEXT_VALID_TOKEN(pSql, sToken)        \
  do {                                        \
    sToken.n = tGetToken(pSql, &sToken.type); \
    sToken.z = pSql;                          \
    pSql += sToken.n;                         \
  } while (TK_NK_SPACE == sToken.type)

51
typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
52 53 54 55 56
  SParseContext*     pComCxt;             // input
  char*              pSql;                // input
  SMsgBuf            msg;                 // input
  STableMeta*        pTableMeta;          // each table
  SParsedDataColInfo tags;                // each table
C
Cary Xu 已提交
57
  STagVal*           pTagVals;            // each table
X
Xiaoyu Wang 已提交
58 59 60 61 62
  SVCreateTbReq      createTblReq;        // each table
  SHashObj*          pVgroupsHashObj;     // global
  SHashObj*          pTableBlockHashObj;  // global
  SHashObj*          pSubTableHashObj;    // global
  SArray*            pVgDataBlocks;       // global
X
Xiaoyu Wang 已提交
63
  SHashObj*          pTableNameHashObj;   // global
X
Xiaoyu Wang 已提交
64
  int32_t            totalNum;
X
Xiaoyu Wang 已提交
65
  SVnodeModifOpStmt* pOutput;
X
Xiaoyu Wang 已提交
66
  SStmtCallback*     pStmtCb;
67 68
} SInsertParseContext;

H
refact  
Hongze Cheng 已提交
69
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
70 71 72 73

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
74
typedef struct SKvParam {
C
Cary Xu 已提交
75 76 77 78 79
  int16_t  nTag;
  int16_t  pos;
  STagVal* pTagVals;
  SSchema* schema;
  char     buf[TSDB_MAX_TAGS_LEN];
D
stmt  
dapan1121 已提交
80 81 82 83 84 85 86 87 88
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

X
Xiaoyu Wang 已提交
89 90 91
#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
D
stmt  
dapan1121 已提交
92
    if (TSDB_CODE_SUCCESS != code) { \
X
Xiaoyu Wang 已提交
93 94
      return code;                   \
    }                                \
D
stmt  
dapan1121 已提交
95 96
  } while (0)

97 98 99 100 101 102 103 104 105 106 107 108 109
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INSERT != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
  }
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INTO != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
  }
  return TSDB_CODE_SUCCESS;
}

110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

172
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
173
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
174
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
175 176
  }

177
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
178
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
179
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
180
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
181 182
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
183
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
184 185
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
186

187 188 189
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
190
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
191 192 193
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";
194
  const char* msg4 = "invalid table name";
X
Xiaoyu Wang 已提交
195

H
refact  
Hongze Cheng 已提交
196 197
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
198

H
refact  
Hongze Cheng 已提交
199
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
200 201 202
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
203
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
204 205 206
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
207
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
208 209 210 211 212
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
213 214 215 216
    if (tbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg4);
    }

C
Cary Xu 已提交
217
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
218
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
219
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
236
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
237 238 239
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
240
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

X
Xiaoyu Wang 已提交
255
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
H
Haojun Liao 已提交
256
  SParseContext* pBasicCtx = pCxt->pComCxt;
D
dapan 已提交
257 258

  bool pass = false;
X
Xiaoyu Wang 已提交
259 260
  CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser,
                            dbFname, AUTH_TYPE_WRITE, &pass));
D
dapan 已提交
261 262 263
  if (!pass) {
    return TSDB_CODE_PAR_PERMISSION_DENIED;
  }
D
stmt  
dapan1121 已提交
264
  if (isStb) {
D
dapan 已提交
265
    CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
H
refact  
Hongze Cheng 已提交
266
                                    &pCxt->pTableMeta));
D
stmt  
dapan1121 已提交
267
  } else {
D
dapan 已提交
268
    CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
H
refact  
Hongze Cheng 已提交
269
                                   &pCxt->pTableMeta));
X
Xiaoyu Wang 已提交
270
    ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0);
X
Xiaoyu Wang 已提交
271 272
    SVgroupInfo vg;
    CHECK_CODE(
D
dapan 已提交
273
        catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
X
Xiaoyu Wang 已提交
274
    CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
stmt  
dapan1121 已提交
275
  }
H
refact  
Hongze Cheng 已提交
276
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
277 278
}

X
Xiaoyu Wang 已提交
279 280 281
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, false);
}
D
stmt  
dapan1121 已提交
282

X
Xiaoyu Wang 已提交
283 284 285
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, true);
}
D
stmt  
dapan1121 已提交
286

287 288 289 290 291 292 293 294 295 296
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
297
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
298 299 300 301 302 303 304 305 306
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
307
    int32_t schemaLen = blk->schemaLen;
H
refact  
Hongze Cheng 已提交
308 309 310 311 312 313 314
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
315
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
H
refact  
Hongze Cheng 已提交
316
  }
317 318 319 320 321 322 323 324 325 326
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
327
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
328 329 330
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
331
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
332 333
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
wafwerar's avatar
wafwerar 已提交
334
    TSWAP(dst->pData, src->pData);
D
dapan1121 已提交
335
    buildMsgHeader(src, dst);
336 337 338 339 340
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
341
int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
342 343 344 345 346
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
347
  TSKEY k = *(TSKEY*)start;
348
  if (k <= pDataBlocks->prevTS) {
349 350 351 352 353 354 355
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
356 357 358 359 360 361
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
362 363

  if (pToken->type == TK_NOW) {
364
    ts = taosGetTimestamp(timePrec);
365 366
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
367
  } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
368
    toInteger(pToken->z, pToken->n, 10, &ts);
H
refact  
Hongze Cheng 已提交
369
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
370
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
371
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
372 373 374 375 376 377 378
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
379
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
380 381 382 383
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
384
    if (pToken->z[k] == ',') {
385 386
      *end = pTokenEnd;
      *time = ts;
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
402
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
403 404 405 406 407
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
408
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
409 410 411 412 413 414 415
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

416
    if (sToken.type == TK_NK_PLUS) {
417
      ts += interval;
418
    } else {
419
      ts = ts - interval;
420 421
    }

422
    *end = pTokenEnd;
423 424
  }

425
  *time = ts;
426 427
  return TSDB_CODE_SUCCESS;
}
428

X
Xiaoyu Wang 已提交
429
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
430 431 432 433
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
434
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
435 436 437 438
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
439
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
440 441 442 443
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

444
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
445
    pToken->z = tmpTokenBuf;
446
    pToken->n = len;
X
Xiaoyu Wang 已提交
447 448 449 450 451
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
452
static bool isNullStr(SToken* pToken) {
453
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
454 455 456
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
457
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
458
  errno = 0;
wafwerar's avatar
wafwerar 已提交
459
  *value = taosStr2Double(pToken->z, endPtr);
X
Xiaoyu Wang 已提交
460 461 462

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
463
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
464 465 466 467 468
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
469 470
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
471 472 473
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;
X
Xiaoyu Wang 已提交
474 475 476 477 478 479 480 481

  int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
482
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
483 484
    }

X
Xiaoyu Wang 已提交
485
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
486 487 488 489
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
490
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
491
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
492
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
493
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
494
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
495 496 497
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
498
      } else if (pToken->type == TK_NK_INTEGER) {
499 500
        return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
501
      } else if (pToken->type == TK_NK_FLOAT) {
502 503
        return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
X
Xiaoyu Wang 已提交
504 505 506 507 508 509
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
X
Xiaoyu Wang 已提交
510
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
511 512 513 514 515 516
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
517
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
518 519
    }

H
refact  
Hongze Cheng 已提交
520
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
521
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
522
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
X
Xiaoyu Wang 已提交
523
      } else if (!IS_VALID_UTINYINT(uv)) {
X
Xiaoyu Wang 已提交
524 525
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
526
      uint8_t tmpVal = (uint8_t)uv;
X
Xiaoyu Wang 已提交
527
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
528 529 530
    }

    case TSDB_DATA_TYPE_SMALLINT: {
X
Xiaoyu Wang 已提交
531
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
532 533 534 535 536
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
537
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
538 539 540
    }

    case TSDB_DATA_TYPE_USMALLINT: {
X
Xiaoyu Wang 已提交
541
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
542
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
X
Xiaoyu Wang 已提交
543
      } else if (!IS_VALID_USMALLINT(uv)) {
X
Xiaoyu Wang 已提交
544 545
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
546
      uint16_t tmpVal = (uint16_t)uv;
X
Xiaoyu Wang 已提交
547
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
548 549 550
    }

    case TSDB_DATA_TYPE_INT: {
X
Xiaoyu Wang 已提交
551
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
552 553 554 555 556
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
557
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
558 559 560
    }

    case TSDB_DATA_TYPE_UINT: {
X
Xiaoyu Wang 已提交
561
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
562
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
X
Xiaoyu Wang 已提交
563
      } else if (!IS_VALID_UINT(uv)) {
X
Xiaoyu Wang 已提交
564 565
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
566
      uint32_t tmpVal = (uint32_t)uv;
X
Xiaoyu Wang 已提交
567
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
568 569 570
    }

    case TSDB_DATA_TYPE_BIGINT: {
X
Xiaoyu Wang 已提交
571
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
572 573 574 575
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
576
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
577 578 579
    }

    case TSDB_DATA_TYPE_UBIGINT: {
X
Xiaoyu Wang 已提交
580
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
581
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
X
Xiaoyu Wang 已提交
582
      } else if (!IS_VALID_UBIGINT(uv)) {
X
Xiaoyu Wang 已提交
583 584
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
585
      return func(pMsgBuf, &uv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
586 587 588 589
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
590
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
591 592
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
593 594
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
595 596 597
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
598
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
599 600 601 602
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
603
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
604 605 606 607 608
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
609
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
610 611 612 613 614
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
D
dapan1121 已提交
615
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
X
Xiaoyu Wang 已提交
616 617
      }

X
Xiaoyu Wang 已提交
618
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
619 620 621
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
622
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
623
    }
C
Cary Xu 已提交
624
#ifdef JSON_TAG_REFACTOR
625
    case TSDB_DATA_TYPE_JSON: {
X
Xiaoyu Wang 已提交
626
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
627 628 629 630
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }
C
Cary Xu 已提交
631
#endif
X
Xiaoyu Wang 已提交
632 633 634 635 636 637
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
638
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
639 640 641 642 643 644
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
645
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
646 647
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
648 649 650 651 652 653

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

654
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
655
    const char* rowEnd = tdRowEnd(rb->pBuf);
656
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
657
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
658 659
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
660 661
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
662
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
D
dapan1121 已提交
663 664 665
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
X
Xiaoyu Wang 已提交
666 667 668
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
669
    }
670
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
671
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
672
  } else {
673
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
674
  }
675

676 677
  return TSDB_CODE_SUCCESS;
}
678 679 680

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
681
  col_id_t nCols = pColList->numOfCols;
682

H
refact  
Hongze Cheng 已提交
683
  pColList->numOfBound = 0;
C
Cary Xu 已提交
684
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
685
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
686
  for (col_id_t i = 0; i < nCols; ++i) {
687 688 689
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
690 691
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
692
  col_id_t lastColIdx = -1;  // last column found
693 694 695
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

696
    if (TK_NK_RP == sToken.type) {
697 698 699
      break;
    }

C
Cary Xu 已提交
700 701
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
702 703 704 705 706
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
707
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
708 709 710 711 712 713
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
714
    pColList->boundColumns[pColList->numOfBound] = index;
715
    ++pColList->numOfBound;
C
Cary Xu 已提交
716 717 718 719 720 721 722 723 724 725 726
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
727 728 729 730 731
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
732
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
733 734 735 736
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
737
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
738
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
739 740 741
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
742
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
743 744 745 746 747
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
748
  if (pColList->numOfCols > pColList->numOfBound) {
749 750 751
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
752 753 754 755

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
756 757
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
  SKvParam* pa = (SKvParam*)param;
758

759 760
  int8_t  type = pa->schema->type;
  int16_t colId = pa->schema->colId;
761

C
Cary Xu 已提交
762
#ifdef JSON_TAG_REFACTOR
X
Xiaoyu Wang 已提交
763
  if (TSDB_DATA_TYPE_JSON == type) {
764 765
    return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
  }
C
Cary Xu 已提交
766
#endif
767 768

  if (value == NULL) {  // it is a null data
X
Xiaoyu Wang 已提交
769 770
    // tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
    // pa->colIdx);
771 772 773
    return TSDB_CODE_SUCCESS;
  }

774
  if (TSDB_DATA_TYPE_BINARY == type) {
C
Cary Xu 已提交
775 776 777
    memcpy(pa->buf + pa->pos, value, len);
    tTagValSet(pa->pTagVals + pa->nTag++, &colId, type, (uint8_t*)(pa->buf + pa->pos), len, false);
    pa->pos += len;
778 779
  } else if (TSDB_DATA_TYPE_NCHAR == type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
780 781 782

    ASSERT((pa->pos + pa->schema->bytes - VARSTR_HEADER_SIZE) <= TSDB_MAX_TAGS_LEN);

783
    int32_t output = 0;
C
Cary Xu 已提交
784
    if (!taosMbsToUcs4(value, len, (TdUcs4*)(pa->buf + pa->pos), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
D
dapan1121 已提交
785 786 787
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
X
Xiaoyu Wang 已提交
788
      char buf[512] = {0};
789
      snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
790
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
791
    }
C
Cary Xu 已提交
792 793
    tTagValSet(pa->pTagVals + pa->nTag++, &colId, type, (uint8_t*)(pa->buf + pa->pos), output, false);
    pa->pos += output;
794
  } else {
C
Cary Xu 已提交
795 796 797
    memcpy(pa->buf + pa->pos, value, TYPE_BYTES[type]);
    tTagValSet(pa->pTagVals + pa->nTag++, &colId, type, (uint8_t*)(pa->buf + pa->pos), TYPE_BYTES[type], false);
    pa->pos + TYPE_BYTES[type];
798
  }
C
Cary Xu 已提交
799
  ASSERT(pa->pos <= TSDB_MAX_TAGS_LEN);
800 801 802 803

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
804
static int32_t buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) {
D
stmt  
dapan1121 已提交
805
  pTbReq->type = TD_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
806
  pTbReq->name = strdup(tname);
H
Hongze Cheng 已提交
807
  pTbReq->ctb.suid = suid;
C
Cary Xu 已提交
808
  pTbReq->ctb.pTag = (uint8_t*)pTag;
X
Xiaoyu Wang 已提交
809 810 811 812

  return TSDB_CODE_SUCCESS;
}

813
// pSql -> tag1_value, ...)
wmmhello's avatar
wmmhello 已提交
814
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
C
Cary Xu 已提交
815 816
  ASSERT(!pCxt->pTagVals);
  if (!(pCxt->pTagVals = taosMemoryCalloc(pCxt->tags.numOfBound, sizeof(STagVal)))) {
817 818 819
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

C
Cary Xu 已提交
820
  SKvParam param = {.pTagVals = pCxt->pTagVals, .nTag = 0, .pos = 0};
X
Xiaoyu Wang 已提交
821 822 823
  SToken   sToken;
  bool     isParseBindParam = false;
  char     tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
C
Cary Xu 已提交
824
  // TODO: JSON_TAG_REFACTOR => here would have json tag?
825
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
826
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
827 828 829 830

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
C
Cary Xu 已提交
831
        taosMemoryFreeClear(pCxt->pTagVals);
D
stmt  
dapan1121 已提交
832 833 834 835 836 837 838
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
C
Cary Xu 已提交
839
      taosMemoryFreeClear(pCxt->pTagVals);
D
stmt  
dapan1121 已提交
840 841
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
    }
X
Xiaoyu Wang 已提交
842

X
Xiaoyu Wang 已提交
843
    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
X
Xiaoyu Wang 已提交
844
    param.schema = pTagSchema;
H
refact  
Hongze Cheng 已提交
845 846
    CHECK_CODE(
        parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
847 848
  }

D
stmt  
dapan1121 已提交
849
  if (isParseBindParam) {
C
Cary Xu 已提交
850
    taosMemoryFreeClear(pCxt->pTagVals);
D
stmt  
dapan1121 已提交
851 852 853
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
854 855 856 857
  // TODO: JSON_TAG_REFACTOR (would be JSON tag or normal tag)
  STag* pTag = NULL;
  if (tTagNew(param.pTagVals, param.nTag, 1, false, &pTag) != 0) {
    taosMemoryFreeClear(pCxt->pTagVals);
858
    return buildInvalidOperationMsg(&pCxt->msg, "out of memory");
859 860
  }

C
Cary Xu 已提交
861 862
  taosMemoryFreeClear(pCxt->pTagVals);
  return buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid);
X
Xiaoyu Wang 已提交
863
}
864

X
Xiaoyu Wang 已提交
865 866 867 868 869 870 871 872
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
873

X
Xiaoyu Wang 已提交
874 875 876 877 878 879 880 881
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
                              int32_t len, STableMeta* pMeta) {
  SVgroupInfo    vg;
  SParseContext* pBasicCtx = pCxt->pComCxt;
  CHECK_CODE(
      catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));

D
dapan 已提交
882
  pMeta->uid = 0;
X
Xiaoyu Wang 已提交
883
  pMeta->vgId = vg.vgId;
D
dapan 已提交
884
  pMeta->tableType = TSDB_CHILD_TABLE;
X
Xiaoyu Wang 已提交
885

X
Xiaoyu Wang 已提交
886 887 888 889 890
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
891 892 893
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
D
dapan 已提交
894
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
H
refact  
Hongze Cheng 已提交
895
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
896 897 898 899
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
900

X
Xiaoyu Wang 已提交
901
  SToken sToken;
902 903
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
904 905 906 907 908

  SName sname;
  createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
  char stbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(&sname, stbFName);
X
Xiaoyu Wang 已提交
909

D
dapan 已提交
910
  CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
911 912 913
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
D
dapan 已提交
914
  CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
915 916

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
917
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
918 919 920

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
921
  if (TK_NK_LP == sToken.type) {
922
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
923 924 925 926 927 928 929 930
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
931
  if (TK_NK_LP != sToken.type) {
932 933
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
934
  CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
935 936 937 938
  NEXT_VALID_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_COMMA == sToken.type) {
    return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
  } else if (TK_NK_RP != sToken.type) {
X
Xiaoyu Wang 已提交
939 940
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
941 942 943 944

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
945 946
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow,
                       char* tmpTokenBuf) {
947
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
948 949 950 951
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
952

H
refact  
Hongze Cheng 已提交
953 954
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
955
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
956
  SToken    sToken = {0};
957 958
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
959
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
X
Xiaoyu Wang 已提交
960
    SSchema* pSchema = &schema[spd->boundColumns[i]];
D
stmt  
dapan1121 已提交
961 962 963 964 965 966 967 968 969 970 971 972 973

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
X
Xiaoyu Wang 已提交
974

975
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
976
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
977
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
978 979

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
980
      TSKEY tsKey = TD_ROW_KEY(row);
981
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
982 983 984 985
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
986
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
987
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
988
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
989 990 991
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
992 993 994
        }
      }
    }
D
stmt  
dapan1121 已提交
995 996

    *gotRow = true;
C
Cary Xu 已提交
997 998
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
C
Cary Xu 已提交
999
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1000 1001
    taosMemoryFree(pSTSchema);
#endif
1002 1003
  }

C
Cary Xu 已提交
1004
  // *len = pBuilder->extendedRowSize;
1005 1006 1007 1008
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
1009
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
1010
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
1011
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
1012
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
1013 1014

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
1015
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
1016 1017
  SToken sToken;
  while (1) {
1018 1019
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
1020
    if (TK_NK_LP != sToken.type) {
1021 1022
      break;
    }
1023
    pCxt->pSql += index;
1024 1025 1026 1027 1028 1029 1030 1031

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
1032 1033 1034
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
X
Xiaoyu Wang 已提交
1035
      pDataBlock->size += extendedRowSize;  // len;
D
stmt  
dapan1121 已提交
1036
    }
1037

1038 1039 1040 1041
    NEXT_VALID_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_COMMA == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    } else if (TK_NK_RP != sToken.type) {
1042 1043 1044
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
1045 1046 1047
    if (gotRow) {
      (*numOfRows)++;
    }
1048 1049
  }

D
stmt  
dapan1121 已提交
1050
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
X
Xiaoyu Wang 已提交
1051
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
1052 1053 1054 1055
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
1056
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
1057 1058 1059 1060
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
1061
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
1062

H
refact  
Hongze Cheng 已提交
1063
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1064
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1065 1066 1067 1068 1069 1070 1071 1072
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1073
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1074
  taosMemoryFreeClear(pReq->name);
H
Hongze Cheng 已提交
1075
  taosMemoryFreeClear(pReq->ctb.pTag);
X
Xiaoyu Wang 已提交
1076 1077
}

X
Xiaoyu Wang 已提交
1078
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1079
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1080
  destroyBoundColumnInfo(&pCxt->tags);
C
Cary Xu 已提交
1081
  taosMemoryFreeClear(pCxt->pTagVals);
X
Xiaoyu Wang 已提交
1082
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1083 1084 1085 1086 1087
}

static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1088
  taosHashCleanup(pCxt->pSubTableHashObj);
D
dapan1121 已提交
1089
  taosHashCleanup(pCxt->pTableNameHashObj);
1090 1091

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1092 1093 1094
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1095 1096 1097 1098 1099 1100
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
C
Cary Xu 已提交
1101 1102 1103
  int32_t tbNum = 0;
  char    tbFName[TSDB_TABLE_FNAME_LEN];
  bool    autoCreateTbl = false;
X
Xiaoyu Wang 已提交
1104

X
Xiaoyu Wang 已提交
1105
  // for each table
1106 1107
  while (1) {
    SToken sToken;
X
Xiaoyu Wang 已提交
1108
    char*  tbName = NULL;
D
stmt  
dapan1121 已提交
1109

1110 1111 1112 1113 1114
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
dapan1121 已提交
1115 1116 1117
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }
X
Xiaoyu Wang 已提交
1118

D
stmt  
dapan1121 已提交
1119
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1120
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1121 1122 1123 1124
      }
      break;
    }

D
stmt  
dapan1121 已提交
1125
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
X
Xiaoyu Wang 已提交
1126
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
D
stmt  
dapan1121 已提交
1127 1128
    }

D
stmt  
dapan1121 已提交
1129 1130
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1131 1132 1133
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
X
Xiaoyu Wang 已提交
1134

D
stmt  
dapan1121 已提交
1135 1136 1137 1138 1139 1140
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
X
Xiaoyu Wang 已提交
1141

1142 1143 1144
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

D
dapan 已提交
1145
    SName name;
1146
    CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
D
dapan 已提交
1147

1148
    tNameExtractFullName(&name, tbFName);
X
Xiaoyu Wang 已提交
1149 1150
    CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));

1151
    // USING clause
1152
    if (TK_USING == sToken.type) {
D
dapan 已提交
1153
      CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
1154
      NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1155
      autoCreateTbl = true;
1156
    } else {
D
dapan1121 已提交
1157 1158 1159
      char dbFName[TSDB_DB_FNAME_LEN];
      tNameGetFullDbName(&name, dbFName);
      CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
1160 1161
    }

H
refact  
Hongze Cheng 已提交
1162
    STableDataBlocks* dataBuf = NULL;
D
dapan 已提交
1163
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1164 1165
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));
1166

1167
    if (TK_NK_LP == sToken.type) {
1168
      // pSql -> field1_name, ...)
D
dapan1121 已提交
1169
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
1170 1171 1172 1173 1174 1175
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1176
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1177 1178

      tbNum++;
1179 1180 1181 1182
      continue;
    }

    // FILE csv_file_path
X
Xiaoyu Wang 已提交
1183
    if (TK_FILE == sToken.type) {
1184 1185
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1186
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1187 1188 1189
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1190
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1191 1192

      tbNum++;
1193 1194 1195 1196 1197
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1198

D
stmt  
dapan1121 已提交
1199
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
X
Xiaoyu Wang 已提交
1200
    SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1201 1202 1203 1204
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
C
Cary Xu 已提交
1205 1206
    (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
                                pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1207

D
dapan 已提交
1208
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1209 1210
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
D
dapan1121 已提交
1211
    pCxt->pTableMeta = NULL;
X
Xiaoyu Wang 已提交
1212

D
stmt  
dapan1121 已提交
1213 1214
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1215

1216
  // merge according to vgId
D
stmt  
dapan1121 已提交
1217
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1218
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1219
  }
1220
  return buildOutput(pCxt);
1221 1222 1223 1224 1225 1226 1227 1228
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
X
Xiaoyu Wang 已提交
1229
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
1230
  SInsertParseContext context = {
X
Xiaoyu Wang 已提交
1231 1232 1233 1234
      .pComCxt = pContext,
      .pSql = (char*)pContext->pSql,
      .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
      .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1235 1236
      .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
      .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
X
Xiaoyu Wang 已提交
1237
      .totalNum = 0,
C
Cary Xu 已提交
1238
      .pTagVals = NULL,
X
Xiaoyu Wang 已提交
1239 1240
      .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
      .pStmtCb = pContext->pStmtCb};
1241

D
stmt  
dapan1121 已提交
1242
  if (pContext->pStmtCb && *pQuery) {
X
Xiaoyu Wang 已提交
1243 1244
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
                                        &context.pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1245
  } else {
X
Xiaoyu Wang 已提交
1246 1247 1248
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
    context.pTableBlockHashObj =
        taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
D
stmt  
dapan1121 已提交
1249
  }
X
Xiaoyu Wang 已提交
1250 1251

  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
X
Xiaoyu Wang 已提交
1252
      NULL == context.pTableNameHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1253
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1254 1255
  }

D
stmt  
dapan1121 已提交
1256 1257 1258 1259
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1260
  if (NULL == *pQuery) {
D
stmt  
dapan1121 已提交
1261 1262 1263 1264
    *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
1265

D
stmt  
dapan1121 已提交
1266 1267 1268 1269
    (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
    (*pQuery)->haveResultSet = false;
    (*pQuery)->msgType = TDMT_VND_SUBMIT;
    (*pQuery)->pRoot = (SNode*)context.pOutput;
1270
  }
X
Xiaoyu Wang 已提交
1271

D
dapan1121 已提交
1272 1273 1274 1275 1276 1277
  if (NULL == (*pQuery)->pTableList) {
    (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
    if (NULL == (*pQuery)->pTableList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
1278

1279
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1280

1281 1282 1283 1284
  int32_t code = skipInsertInto(&context);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
1285
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
X
Xiaoyu Wang 已提交
1286 1287 1288 1289 1290 1291
    SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
    while (NULL != pTable) {
      taosArrayPush((*pQuery)->pTableList, pTable);
      pTable = taosHashIterate(context.pTableNameHashObj, pTable);
    }
  }
1292
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1293
  return code;
1294
}
D
stmt  
dapan1121 已提交
1295

X
Xiaoyu Wang 已提交
1296 1297 1298 1299
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
D
stmt  
dapan1121 已提交
1300
  int32_t code = 0;
X
Xiaoyu Wang 已提交
1301 1302
  char*   tbName = NULL;

D
stmt  
dapan1121 已提交
1303
  NEXT_TOKEN(pTableName, sToken);
X
Xiaoyu Wang 已提交
1304

D
stmt  
dapan1121 已提交
1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1316
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1317 1318 1319 1320 1321 1322 1323
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
1324 1325
  SVnodeModifOpStmt*  modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
  int32_t             code = 0;
D
stmt  
dapan1121 已提交
1326
  SInsertParseContext insertCtx = {
X
Xiaoyu Wang 已提交
1327 1328 1329
      .pVgroupsHashObj = pVgHash,
      .pTableBlockHashObj = pBlockHash,
      .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1330
  };
X
Xiaoyu Wang 已提交
1331

D
stmt  
dapan1121 已提交
1332 1333
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1334
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1335 1336 1337 1338
  }

  CHECK_CODE(buildOutput(&insertCtx));

wmmhello's avatar
wmmhello 已提交
1339
  destroyBlockArrayList(insertCtx.pVgDataBlocks);
D
stmt  
dapan1121 已提交
1340 1341 1342
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1343 1344 1345 1346
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
                           char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
stmt  
dapan1121 已提交
1347 1348 1349 1350 1351
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

C
Cary Xu 已提交
1352 1353 1354
  STagVal* pTagVals = taosMemoryCalloc(tags->numOfBound, sizeof(STagVal));
  if (!pTagVals) {
    return buildInvalidOperationMsg(&pBuf, "out of memory");
D
stmt  
dapan1121 已提交
1355 1356
  }

D
dapan1121 已提交
1357
  SSchema* pSchema = pDataBlock->pTableMeta->schema;
C
Cary Xu 已提交
1358
  SKvParam param = {.pTagVals = pTagVals, .nTag = 0, .pos = 0};
D
stmt  
dapan1121 已提交
1359 1360 1361 1362 1363 1364

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      KvRowAppend(&pBuf, NULL, 0, &param);
      continue;
    }
X
Xiaoyu Wang 已提交
1365

X
Xiaoyu Wang 已提交
1366
    SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
D
stmt  
dapan1121 已提交
1367 1368 1369 1370 1371 1372
    param.schema = pTagSchema;

    int32_t colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
X
Xiaoyu Wang 已提交
1373 1374

    CHECK_CODE(KvRowAppend(&pBuf, (char*)bind[c].buffer, colLen, &param));
D
stmt  
dapan1121 已提交
1375 1376
  }

C
Cary Xu 已提交
1377 1378 1379 1380
  STag* pTag = NULL;

  // TODO: JSON_TAG_REFACTOR (if is json or not)?
  if (0 != tTagNew(pTagVals, param.nTag, 1, false, &pTag)) {
1381
    return buildInvalidOperationMsg(&pBuf, "out of memory");
D
stmt  
dapan1121 已提交
1382 1383 1384
  }

  SVCreateTbReq tbReq = {0};
C
Cary Xu 已提交
1385
  CHECK_CODE(buildCreateTbReq(&tbReq, tName, pTag, suid));
D
stmt  
dapan1121 已提交
1386 1387 1388
  CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));

  destroyCreateSubTbReq(&tbReq);
C
Cary Xu 已提交
1389
  taosMemoryFreeClear(pTagVals);
D
stmt  
dapan1121 已提交
1390 1391 1392 1393

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1394 1395 1396 1397
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1398 1399
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1400 1401 1402 1403
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
stmt  
dapan1121 已提交
1404 1405
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1406
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
X
Xiaoyu Wang 已提交
1407

D
stmt  
dapan1121 已提交
1408 1409 1410
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
X
Xiaoyu Wang 已提交
1411

D
stmt  
dapan1121 已提交
1412
    for (int c = 0; c < spd->numOfBound; ++c) {
X
Xiaoyu Wang 已提交
1413
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
D
stmt  
dapan1121 已提交
1414 1415 1416 1417

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
X
Xiaoyu Wang 已提交
1418

D
stmt  
dapan1121 已提交
1419 1420 1421 1422
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1423 1424 1425
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
X
Xiaoyu Wang 已提交
1426

D
stmt  
dapan1121 已提交
1427 1428
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
D
dapan1121 已提交
1429 1430 1431 1432
        if (bind[c].buffer_type != pColSchema->type) {
          return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
        }

D
stmt  
dapan1121 已提交
1433 1434 1435 1436
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
X
Xiaoyu Wang 已提交
1437 1438

        CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1439
      }
X
Xiaoyu Wang 已提交
1440

D
stmt  
dapan1121 已提交
1441 1442
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1443
        checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454
      }
    }
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
C
Cary Xu 已提交
1455 1456
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1457
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1458 1459
    taosMemoryFree(pSTSchema);
#endif
D
stmt  
dapan1121 已提交
1460 1461
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
1462

X
Xiaoyu Wang 已提交
1463
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1464 1465 1466 1467 1468 1469 1470
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1471 1472 1473 1474 1475
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1476 1477
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1478 1479 1480 1481
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  bool                rowStart = (0 == colIdx);
  bool                rowEnd = ((colIdx + 1) == spd->numOfBound);
D
stmt  
dapan1121 已提交
1482 1483 1484 1485 1486

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
X
Xiaoyu Wang 已提交
1487

D
stmt  
dapan1121 已提交
1488 1489 1490 1491 1492 1493 1494
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
D
stmt  
dapan1121 已提交
1495

X
Xiaoyu Wang 已提交
1496
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]];
D
stmt  
dapan1121 已提交
1497 1498 1499 1500

    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
X
Xiaoyu Wang 已提交
1501

D
stmt  
dapan1121 已提交
1502 1503 1504 1505 1506 1507 1508
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
X
Xiaoyu Wang 已提交
1509

D
stmt  
dapan1121 已提交
1510 1511
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
1512 1513 1514 1515
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
1516 1517 1518 1519
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
X
Xiaoyu Wang 已提交
1520 1521

      CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1522
    }
X
Xiaoyu Wang 已提交
1523

D
stmt  
dapan1121 已提交
1524 1525
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1526
      checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1527
    }
X
Xiaoyu Wang 已提交
1528

D
stmt  
dapan1121 已提交
1529 1530 1531 1532 1533 1534 1535 1536
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
X
Xiaoyu Wang 已提交
1537
    }
C
Cary Xu 已提交
1538 1539

#ifdef TD_DEBUG_PRINT_ROW
X
Xiaoyu Wang 已提交
1540
    if (rowEnd) {
C
Cary Xu 已提交
1541
      STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1542
      tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1543 1544 1545
      taosMemoryFree(pSTSchema);
    }
#endif
D
stmt  
dapan1121 已提交
1546 1547
  }

D
stmt  
dapan1121 已提交
1548 1549 1550
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

X
Xiaoyu Wang 已提交
1551
    SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1552 1553 1554 1555 1556 1557 1558 1559
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1560
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
1561 1562 1563 1564 1565 1566 1567
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1568
      SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i]];
D
stmt  
dapan1121 已提交
1569 1570 1571 1572
      strcpy((*fields)[i].name, pTagSchema->name);
      (*fields)[i].type = pTagSchema->type;
      (*fields)[i].bytes = pTagSchema->bytes;
    }
D
stmt  
dapan1121 已提交
1573 1574 1575 1576 1577 1578 1579
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1580 1581
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
1582 1583 1584 1585
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
X
Xiaoyu Wang 已提交
1586 1587

  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1588 1589 1590 1591 1592 1593 1594 1595
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1596

D
stmt  
dapan1121 已提交
1597 1598 1599
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1600 1601 1602
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*          pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1603 1604
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
1605 1606 1607
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
1608 1609 1610 1611 1612

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1613

D
stmt  
dapan1121 已提交
1614 1615 1616
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1617
// schemaless logic start
D
stmt  
dapan1121 已提交
1618

wmmhello's avatar
wmmhello 已提交
1619
typedef struct SmlExecTableHandle {
X
Xiaoyu Wang 已提交
1620 1621
  SParsedDataColInfo tags;          // each table
  SVCreateTbReq      createTblReq;  // each table
wmmhello's avatar
wmmhello 已提交
1622
} SmlExecTableHandle;
wmmhello's avatar
wmmhello 已提交
1623

wmmhello's avatar
wmmhello 已提交
1624
typedef struct SmlExecHandle {
1625 1626 1627
  SHashObj*          pBlockHash;
  SmlExecTableHandle tableExecHandle;
  SQuery*            pQuery;
wmmhello's avatar
wmmhello 已提交
1628
} SSmlExecHandle;
wmmhello's avatar
wmmhello 已提交
1629

wmmhello's avatar
wmmhello 已提交
1630 1631 1632 1633 1634 1635
static void smlDestroyTableHandle(void* pHandle) {
  SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
  destroyBoundColumnInfo(&handle->tags);
  destroyCreateSubTbReq(&handle->createTblReq);
}

X
Xiaoyu Wang 已提交
1636
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
wmmhello's avatar
wmmhello 已提交
1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648
  col_id_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0;
  pColList->boundNullLen = 0;
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
  for (col_id_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  bool     isOrdered = true;
  col_id_t lastColIdx = -1;  // last column found
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
X
Xiaoyu Wang 已提交
1649 1650
    SSmlKv*  kv = taosArrayGetP(cols, i);
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
wmmhello's avatar
wmmhello 已提交
1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
1665
    pColList->boundColumns[pColList->numOfBound] = index;
wmmhello's avatar
wmmhello 已提交
1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
    ++pColList->numOfBound;
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
1699
  if (pColList->numOfCols > pColList->numOfBound) {
wmmhello's avatar
wmmhello 已提交
1700 1701 1702 1703 1704 1705 1706
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
/**
 * @brief No json tag for schemaless
 *
 * @param cols
 * @param tags
 * @param pSchema
 * @param ppTag
 * @param msg
 * @return int32_t
 */
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SMsgBuf* msg) {
  STagVal* pTagVals = taosMemoryCalloc(tags->numOfBound, sizeof(STagVal));
  if (!pTagVals) {
wmmhello's avatar
wmmhello 已提交
1720 1721 1722
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

C
Cary Xu 已提交
1723
  SKvParam param = {.pTagVals = pTagVals, .nTag = 0, .pos = 0};
wmmhello's avatar
wmmhello 已提交
1724
  for (int i = 0; i < tags->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1725
    SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
wmmhello's avatar
wmmhello 已提交
1726
    param.schema = pTagSchema;
X
Xiaoyu Wang 已提交
1727
    SSmlKv* kv = taosArrayGetP(cols, i);
1728
    if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
1729
      KvRowAppend(msg, kv->value, kv->length, &param);
1730
    } else {
wmmhello's avatar
wmmhello 已提交
1731 1732
      KvRowAppend(msg, &(kv->value), kv->length, &param);
    }
wmmhello's avatar
wmmhello 已提交
1733 1734
  }

C
Cary Xu 已提交
1735 1736
  if (tTagNew(pTagVals, param.nTag, 1, false, ppTag) != 0) {
    taosMemoryFree(pTagVals);
1737
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1738
  }
C
Cary Xu 已提交
1739 1740

  taosMemoryFree(pTagVals);
wmmhello's avatar
wmmhello 已提交
1741 1742 1743
  return TSDB_CODE_SUCCESS;
}

1744 1745
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
                    char* tableName, char* msgBuf, int16_t msgBufLen) {
wmmhello's avatar
wmmhello 已提交
1746 1747
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

X
Xiaoyu Wang 已提交
1748
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
1749 1750
  smlDestroyTableHandle(&smlHandle->tableExecHandle);  // free for each table
  SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
wmmhello's avatar
wmmhello 已提交
1751 1752
  setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
  int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
X
Xiaoyu Wang 已提交
1753
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1754 1755 1756
    buildInvalidOperationMsg(&pBuf, "bound tags error");
    return ret;
  }
C
Cary Xu 已提交
1757 1758
  STag* pTag = NULL;
  ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &pBuf);
X
Xiaoyu Wang 已提交
1759
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1760 1761
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1762

C
Cary Xu 已提交
1763
  buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid);
wmmhello's avatar
wmmhello 已提交
1764

wmmhello's avatar
wmmhello 已提交
1765
  STableDataBlocks* pDataBlock = NULL;
X
Xiaoyu Wang 已提交
1766 1767
  ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
                             TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
wmmhello's avatar
wmmhello 已提交
1768
                             pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
X
Xiaoyu Wang 已提交
1769
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1770 1771 1772
    buildInvalidOperationMsg(&pBuf, "create data block error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1773 1774 1775

  SSchema* pSchema = getTableColumnSchema(pTableMeta);

1776
  ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
X
Xiaoyu Wang 已提交
1777
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1778 1779 1780
    buildInvalidOperationMsg(&pBuf, "bound cols error");
    return ret;
  }
X
Xiaoyu Wang 已提交
1781
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
wmmhello's avatar
wmmhello 已提交
1782 1783
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1784
  SMemParam           param = {.rb = pBuilder};
wmmhello's avatar
wmmhello 已提交
1785 1786 1787

  initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);

1788
  int32_t rowNum = taosArrayGetSize(cols);
1789
  if (rowNum <= 0) {
wmmhello's avatar
wmmhello 已提交
1790 1791
    return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
  }
wmmhello's avatar
wmmhello 已提交
1792
  ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
X
Xiaoyu Wang 已提交
1793
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1794 1795 1796
    buildInvalidOperationMsg(&pBuf, "allocate memory error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1797 1798 1799
  for (int32_t r = 0; r < rowNum; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
1800
    void*  rowData = taosArrayGetP(cols, r);
1801
    size_t rowDataSize = 0;
1802
    if (format) {
1803
      rowDataSize = taosArrayGetSize(rowData);
wmmhello's avatar
wmmhello 已提交
1804
    }
wmmhello's avatar
wmmhello 已提交
1805 1806

    // 1. set the parsed value from sql string
1807
    for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
1808
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
wmmhello's avatar
wmmhello 已提交
1809 1810 1811 1812

      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

X
Xiaoyu Wang 已提交
1813 1814 1815
      SSmlKv* kv = NULL;
      if (format) {
        if (j < rowDataSize) {
1816
          kv = taosArrayGetP(rowData, j);
X
Xiaoyu Wang 已提交
1817 1818
          if (rowDataSize != spd->numOfBound &&
              (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
1819
            kv = NULL;
X
Xiaoyu Wang 已提交
1820
          } else {
1821
            j++;
1822
          }
wmmhello's avatar
wmmhello 已提交
1823
        }
X
Xiaoyu Wang 已提交
1824 1825 1826
      } else {
        void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
        if (p) kv = *p;
wmmhello's avatar
wmmhello 已提交
1827
      }
wmmhello's avatar
wmmhello 已提交
1828

1829
      if (!kv || kv->length == 0) {
wmmhello's avatar
wmmhello 已提交
1830 1831
        MemRowAppend(&pBuf, NULL, 0, &param);
      } else {
wmmhello's avatar
wmmhello 已提交
1832 1833
        int32_t colLen = kv->length;
        if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
1834
          kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
wmmhello's avatar
wmmhello 已提交
1835 1836
        }

1837
        if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
1838
          MemRowAppend(&pBuf, kv->value, colLen, &param);
1839
        } else {
wmmhello's avatar
wmmhello 已提交
1840 1841
          MemRowAppend(&pBuf, &(kv->value), colLen, &param);
        }
wmmhello's avatar
wmmhello 已提交
1842 1843 1844 1845
      }

      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1846
        checkTimestamp(pDataBlock, (const char*)&tsKey);
wmmhello's avatar
wmmhello 已提交
1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862
      }
    }

    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }

    pDataBlock->size += extendedRowSize;
  }

X
Xiaoyu Wang 已提交
1863
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
wmmhello's avatar
wmmhello 已提交
1864 1865 1866 1867 1868 1869 1870
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1871 1872 1873
void* smlInitHandle(SQuery* pQuery) {
  SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
  if (!handle) return NULL;
wmmhello's avatar
wmmhello 已提交
1874 1875 1876 1877 1878 1879
  handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
  handle->pQuery = pQuery;

  return handle;
}

X
Xiaoyu Wang 已提交
1880 1881 1882
void smlDestroyHandle(void* pHandle) {
  if (!pHandle) return;
  SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
wmmhello's avatar
wmmhello 已提交
1883
  destroyBlockHashmap(handle->pBlockHash);
wmmhello's avatar
wmmhello 已提交
1884
  smlDestroyTableHandle(&handle->tableExecHandle);
wmmhello's avatar
wmmhello 已提交
1885 1886 1887 1888
  taosMemoryFree(handle);
}

int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
1889
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
wmmhello's avatar
wmmhello 已提交
1890 1891
  return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
wmmhello's avatar
wmmhello 已提交
1892
// schemaless logic end