parInsert.c 64.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43
  } while (0)

44 45 46 47 48 49 50
#define NEXT_VALID_TOKEN(pSql, sToken)        \
  do {                                        \
    sToken.n = tGetToken(pSql, &sToken.type); \
    sToken.z = pSql;                          \
    pSql += sToken.n;                         \
  } while (TK_NK_SPACE == sToken.type)

51
typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
52 53 54 55 56 57 58 59 60 61 62
  SParseContext*     pComCxt;             // input
  char*              pSql;                // input
  SMsgBuf            msg;                 // input
  STableMeta*        pTableMeta;          // each table
  SParsedDataColInfo tags;                // each table
  SKVRowBuilder      tagsBuilder;         // each table
  SVCreateTbReq      createTblReq;        // each table
  SHashObj*          pVgroupsHashObj;     // global
  SHashObj*          pTableBlockHashObj;  // global
  SHashObj*          pSubTableHashObj;    // global
  SArray*            pVgDataBlocks;       // global
X
Xiaoyu Wang 已提交
63
  SHashObj*          pTableNameHashObj;   // global
X
Xiaoyu Wang 已提交
64
  int32_t            totalNum;
X
Xiaoyu Wang 已提交
65
  SVnodeModifOpStmt* pOutput;
X
Xiaoyu Wang 已提交
66
  SStmtCallback*     pStmtCb;
67 68
} SInsertParseContext;

H
refact  
Hongze Cheng 已提交
69
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
70 71 72 73

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
74
typedef struct SKvParam {
X
Xiaoyu Wang 已提交
75 76
  SKVRowBuilder* builder;
  SSchema*       schema;
D
stmt  
dapan1121 已提交
77 78 79 80 81 82 83 84 85 86
  char           buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

X
Xiaoyu Wang 已提交
87 88 89
#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
D
stmt  
dapan1121 已提交
90
    if (TSDB_CODE_SUCCESS != code) { \
X
Xiaoyu Wang 已提交
91 92
      return code;                   \
    }                                \
D
stmt  
dapan1121 已提交
93 94
  } while (0)

95 96 97 98 99 100 101 102 103 104 105 106 107
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INSERT != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
  }
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INTO != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
  }
  return TSDB_CODE_SUCCESS;
}

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

170
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
171
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
172
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
173 174
  }

175
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
176
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
177
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
178
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
179 180
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
181
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
182 183
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
184

185 186 187
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
188
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
189 190 191 192
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";

H
refact  
Hongze Cheng 已提交
193 194
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
195

H
refact  
Hongze Cheng 已提交
196
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
197 198 199
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
200
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
201 202 203
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
204
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
205 206 207 208 209
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
H
refact  
Hongze Cheng 已提交
210
    char    tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
211
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
212
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
229
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
230 231 232
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
233
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

X
Xiaoyu Wang 已提交
248
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
H
Haojun Liao 已提交
249
  SParseContext* pBasicCtx = pCxt->pComCxt;
D
dapan 已提交
250 251

  bool pass = false;
X
Xiaoyu Wang 已提交
252 253
  CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser,
                            dbFname, AUTH_TYPE_WRITE, &pass));
D
dapan 已提交
254 255 256
  if (!pass) {
    return TSDB_CODE_PAR_PERMISSION_DENIED;
  }
D
stmt  
dapan1121 已提交
257
  if (isStb) {
D
dapan 已提交
258
    CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
H
refact  
Hongze Cheng 已提交
259
                                    &pCxt->pTableMeta));
D
stmt  
dapan1121 已提交
260
  } else {
D
dapan 已提交
261
    CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
H
refact  
Hongze Cheng 已提交
262
                                   &pCxt->pTableMeta));
X
Xiaoyu Wang 已提交
263
    ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0);
X
Xiaoyu Wang 已提交
264 265
    SVgroupInfo vg;
    CHECK_CODE(
D
dapan 已提交
266
        catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
X
Xiaoyu Wang 已提交
267
    CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
stmt  
dapan1121 已提交
268
  }
H
refact  
Hongze Cheng 已提交
269
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
270 271
}

X
Xiaoyu Wang 已提交
272 273 274
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, false);
}
D
stmt  
dapan1121 已提交
275

X
Xiaoyu Wang 已提交
276 277 278
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, true);
}
D
stmt  
dapan1121 已提交
279

280 281 282 283 284 285 286 287 288 289
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
290
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
291 292 293 294 295 296 297 298 299
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
300
    int32_t schemaLen = blk->schemaLen;
H
refact  
Hongze Cheng 已提交
301 302 303 304 305 306 307
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
308
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
H
refact  
Hongze Cheng 已提交
309
  }
310 311 312 313 314 315 316 317 318 319
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
320
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
321 322 323
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
324
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
325 326
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
wafwerar's avatar
wafwerar 已提交
327
    TSWAP(dst->pData, src->pData);
D
dapan1121 已提交
328
    buildMsgHeader(src, dst);
329 330 331 332 333
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
334
int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
335 336 337 338 339
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
340
  TSKEY k = *(TSKEY*)start;
341
  if (k <= pDataBlocks->prevTS) {
342 343 344 345 346 347 348
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
349 350 351 352 353 354
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
355 356

  if (pToken->type == TK_NOW) {
357
    ts = taosGetTimestamp(timePrec);
358 359
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
360
  } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
361
    toInteger(pToken->z, pToken->n, 10, &ts);
H
refact  
Hongze Cheng 已提交
362
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
363
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
364
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
365 366 367 368 369 370 371
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
372
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
373 374 375 376
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
377
    if (pToken->z[k] == ',') {
378 379
      *end = pTokenEnd;
      *time = ts;
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
395
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
396 397 398 399 400
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
401
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
402 403 404 405 406 407 408
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

409
    if (sToken.type == TK_NK_PLUS) {
410
      ts += interval;
411
    } else {
412
      ts = ts - interval;
413 414
    }

415
    *end = pTokenEnd;
416 417
  }

418
  *time = ts;
419 420
  return TSDB_CODE_SUCCESS;
}
421

X
Xiaoyu Wang 已提交
422
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
423 424 425 426
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
427
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
428 429 430 431
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
432
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
433 434 435 436
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

437
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
438
    pToken->z = tmpTokenBuf;
439
    pToken->n = len;
X
Xiaoyu Wang 已提交
440 441 442 443 444
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
445
static bool isNullStr(SToken* pToken) {
446
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
447 448 449
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
450
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
451
  errno = 0;
wafwerar's avatar
wafwerar 已提交
452
  *value = taosStr2Double(pToken->z, endPtr);
X
Xiaoyu Wang 已提交
453 454 455

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
456
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
457 458 459 460 461
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
462 463
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
464 465 466
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;
X
Xiaoyu Wang 已提交
467 468 469 470 471 472 473 474

  int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
475
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
476 477
    }

X
Xiaoyu Wang 已提交
478
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
479 480 481 482
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
483
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
484
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
485
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
486
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
487
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
488 489 490
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
491
      } else if (pToken->type == TK_NK_INTEGER) {
492 493
        return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
494
      } else if (pToken->type == TK_NK_FLOAT) {
495 496
        return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
X
Xiaoyu Wang 已提交
497 498 499 500 501 502
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
X
Xiaoyu Wang 已提交
503
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
504 505 506 507 508 509
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
510
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
511 512
    }

H
refact  
Hongze Cheng 已提交
513
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
514
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
515
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
X
Xiaoyu Wang 已提交
516
      } else if (!IS_VALID_UTINYINT(uv)) {
X
Xiaoyu Wang 已提交
517 518
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
519
      uint8_t tmpVal = (uint8_t)uv;
X
Xiaoyu Wang 已提交
520
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
521 522 523
    }

    case TSDB_DATA_TYPE_SMALLINT: {
X
Xiaoyu Wang 已提交
524
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
525 526 527 528 529
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
530
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
531 532 533
    }

    case TSDB_DATA_TYPE_USMALLINT: {
X
Xiaoyu Wang 已提交
534
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
535
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
X
Xiaoyu Wang 已提交
536
      } else if (!IS_VALID_USMALLINT(uv)) {
X
Xiaoyu Wang 已提交
537 538
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
539
      uint16_t tmpVal = (uint16_t)uv;
X
Xiaoyu Wang 已提交
540
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
541 542 543
    }

    case TSDB_DATA_TYPE_INT: {
X
Xiaoyu Wang 已提交
544
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
545 546 547 548 549
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
550
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
551 552 553
    }

    case TSDB_DATA_TYPE_UINT: {
X
Xiaoyu Wang 已提交
554
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
555
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
X
Xiaoyu Wang 已提交
556
      } else if (!IS_VALID_UINT(uv)) {
X
Xiaoyu Wang 已提交
557 558
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
559
      uint32_t tmpVal = (uint32_t)uv;
X
Xiaoyu Wang 已提交
560
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
561 562 563
    }

    case TSDB_DATA_TYPE_BIGINT: {
X
Xiaoyu Wang 已提交
564
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
565 566 567 568
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
569
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
570 571 572
    }

    case TSDB_DATA_TYPE_UBIGINT: {
X
Xiaoyu Wang 已提交
573
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
574
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
X
Xiaoyu Wang 已提交
575
      } else if (!IS_VALID_UBIGINT(uv)) {
X
Xiaoyu Wang 已提交
576 577
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
578
      return func(pMsgBuf, &uv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
579 580 581 582
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
583
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
584 585
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
586 587
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
588 589 590
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
591
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
592 593 594 595
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
596
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
597 598 599 600 601
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
602
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
603 604 605 606 607 608 609 610
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
        return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
      }

X
Xiaoyu Wang 已提交
611
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
612 613 614
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
615
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
616 617
    }

618
    case TSDB_DATA_TYPE_JSON: {
X
Xiaoyu Wang 已提交
619
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
620 621 622 623 624
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }

X
Xiaoyu Wang 已提交
625 626 627 628 629 630
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
631
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
632 633 634 635 636 637
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
638
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
639 640
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
641 642 643 644 645 646

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

647
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
648
    const char* rowEnd = tdRowEnd(rb->pBuf);
649
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
C
Cary Xu 已提交
650
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, true, pa->toffset, pa->colIdx);
651 652
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
653 654
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
655
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
656 657 658
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
659
    }
660
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
661
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
662
  } else {
663
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
664
  }
665

666 667
  return TSDB_CODE_SUCCESS;
}
668 669 670

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
671
  col_id_t nCols = pColList->numOfCols;
672

H
refact  
Hongze Cheng 已提交
673
  pColList->numOfBound = 0;
C
Cary Xu 已提交
674
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
675
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
676
  for (col_id_t i = 0; i < nCols; ++i) {
677 678 679
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
680 681
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
682
  col_id_t lastColIdx = -1;  // last column found
683 684 685
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

686
    if (TK_NK_RP == sToken.type) {
687 688 689
      break;
    }

C
Cary Xu 已提交
690 691
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
692 693 694 695 696
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
697
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
698 699 700 701 702 703
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
704
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
705
    ++pColList->numOfBound;
C
Cary Xu 已提交
706 707 708 709 710 711 712 713 714 715 716
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
717 718 719 720 721
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
722
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
723 724 725 726
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
727
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
728
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
729 730 731
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
732
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
733 734 735 736 737
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
738
  if (pColList->numOfCols > pColList->numOfBound) {
739 740 741
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
742 743 744 745

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
746 747
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
  SKvParam* pa = (SKvParam*)param;
748

749 750
  int8_t  type = pa->schema->type;
  int16_t colId = pa->schema->colId;
751

X
Xiaoyu Wang 已提交
752
  if (TSDB_DATA_TYPE_JSON == type) {
753 754 755 756
    return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
  }

  if (value == NULL) {  // it is a null data
X
Xiaoyu Wang 已提交
757 758
    // tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
    // pa->colIdx);
759 760 761
    return TSDB_CODE_SUCCESS;
  }

762 763
  if (TSDB_DATA_TYPE_BINARY == type) {
    STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
764
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
765 766 767
  } else if (TSDB_DATA_TYPE_NCHAR == type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
    int32_t output = 0;
wafwerar's avatar
wafwerar 已提交
768
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
769
      char buf[512] = {0};
770
      snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
771
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
772 773 774
    }

    varDataSetLen(pa->buf, output);
775
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
776
  } else {
777
    tdAddColToKVRow(pa->builder, colId, value, TYPE_BYTES[type]);
778 779 780 781 782
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
783
static int32_t buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, SKVRow row, int64_t suid) {
D
stmt  
dapan1121 已提交
784
  pTbReq->type = TD_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
785
  pTbReq->name = strdup(tname);
H
Hongze Cheng 已提交
786 787
  pTbReq->ctb.suid = suid;
  pTbReq->ctb.pTag = row;
X
Xiaoyu Wang 已提交
788 789 790 791

  return TSDB_CODE_SUCCESS;
}

792
// pSql -> tag1_value, ...)
wmmhello's avatar
wmmhello 已提交
793
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
794
  if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
795 796 797
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

798
  SKvParam param = {.builder = &pCxt->tagsBuilder};
X
Xiaoyu Wang 已提交
799 800 801
  SToken   sToken;
  bool     isParseBindParam = false;
  char     tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
802
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
803
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
804 805 806 807 808 809 810 811 812 813 814 815 816

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
    }
X
Xiaoyu Wang 已提交
817 818

    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1];  // colId starts with 1
X
Xiaoyu Wang 已提交
819
    param.schema = pTagSchema;
H
refact  
Hongze Cheng 已提交
820 821
    CHECK_CODE(
        parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
822 823
  }

D
stmt  
dapan1121 已提交
824 825 826 827
  if (isParseBindParam) {
    return TSDB_CODE_SUCCESS;
  }

828
  SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
829 830 831 832 833
  if (NULL == row) {
    return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

wmmhello's avatar
wmmhello 已提交
834
  return buildCreateTbReq(&pCxt->createTblReq, tName, row, pCxt->pTableMeta->suid);
X
Xiaoyu Wang 已提交
835
}
836

X
Xiaoyu Wang 已提交
837 838 839 840 841 842 843 844
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
845

X
Xiaoyu Wang 已提交
846 847 848 849 850 851 852 853
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
                              int32_t len, STableMeta* pMeta) {
  SVgroupInfo    vg;
  SParseContext* pBasicCtx = pCxt->pComCxt;
  CHECK_CODE(
      catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));

D
dapan 已提交
854
  pMeta->uid = 0;
X
Xiaoyu Wang 已提交
855
  pMeta->vgId = vg.vgId;
D
dapan 已提交
856
  pMeta->tableType = TSDB_CHILD_TABLE;
X
Xiaoyu Wang 已提交
857

X
Xiaoyu Wang 已提交
858 859 860 861 862
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
863 864 865
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
D
dapan 已提交
866
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
H
refact  
Hongze Cheng 已提交
867
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
868 869 870 871
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
872

X
Xiaoyu Wang 已提交
873
  SToken sToken;
874 875
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
876 877 878 879 880

  SName sname;
  createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
  char stbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(&sname, stbFName);
X
Xiaoyu Wang 已提交
881

D
dapan 已提交
882
  CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
883 884 885
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
D
dapan 已提交
886
  CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
887 888

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
889
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
890 891 892

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
893
  if (TK_NK_LP == sToken.type) {
894
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
895 896 897 898 899 900 901 902
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
903
  if (TK_NK_LP != sToken.type) {
904 905
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
D
dapan 已提交
906
  CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
907 908 909 910
  NEXT_VALID_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_COMMA == sToken.type) {
    return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
  } else if (TK_NK_RP != sToken.type) {
X
Xiaoyu Wang 已提交
911 912
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
913 914 915 916

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
917 918
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow,
                       char* tmpTokenBuf) {
919
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
920 921 922 923
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
924

H
refact  
Hongze Cheng 已提交
925 926
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
927
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
928
  SToken    sToken = {0};
929 930
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
931
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
932
    SSchema* pSchema = &schema[spd->boundColumns[i] - 1];
D
stmt  
dapan1121 已提交
933 934 935 936 937 938 939 940 941 942 943 944 945

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
X
Xiaoyu Wang 已提交
946

947
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
948
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
949
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
950 951

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
952
      TSKEY tsKey = TD_ROW_KEY(row);
953
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
954 955 956 957
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
958
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
959
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
960
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
961 962 963
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
964 965 966
        }
      }
    }
D
stmt  
dapan1121 已提交
967 968

    *gotRow = true;
C
Cary Xu 已提交
969 970
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
C
Cary Xu 已提交
971
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
972 973
    taosMemoryFree(pSTSchema);
#endif
974 975
  }

C
Cary Xu 已提交
976
  // *len = pBuilder->extendedRowSize;
977 978 979 980
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
981
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
982
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
983
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
984
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
985 986

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
987
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
988 989
  SToken sToken;
  while (1) {
990 991
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
992
    if (TK_NK_LP != sToken.type) {
993 994
      break;
    }
995
    pCxt->pSql += index;
996 997 998 999 1000 1001 1002 1003

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
1004 1005 1006
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
X
Xiaoyu Wang 已提交
1007
      pDataBlock->size += extendedRowSize;  // len;
D
stmt  
dapan1121 已提交
1008
    }
1009

1010 1011 1012 1013
    NEXT_VALID_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_COMMA == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    } else if (TK_NK_RP != sToken.type) {
1014 1015 1016
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
1017 1018 1019
    if (gotRow) {
      (*numOfRows)++;
    }
1020 1021
  }

D
stmt  
dapan1121 已提交
1022
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
X
Xiaoyu Wang 已提交
1023
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
1024 1025 1026 1027
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
1028
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
1029 1030 1031 1032
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
1033
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
1034

H
refact  
Hongze Cheng 已提交
1035
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1036
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1037 1038 1039 1040 1041 1042 1043 1044
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1045
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1046
  taosMemoryFreeClear(pReq->name);
H
Hongze Cheng 已提交
1047
  taosMemoryFreeClear(pReq->ctb.pTag);
X
Xiaoyu Wang 已提交
1048 1049
}

X
Xiaoyu Wang 已提交
1050
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1051
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1052 1053
  destroyBoundColumnInfo(&pCxt->tags);
  tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
X
Xiaoyu Wang 已提交
1054
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1055 1056 1057 1058 1059
}

static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1060
  taosHashCleanup(pCxt->pSubTableHashObj);
D
dapan1121 已提交
1061
  taosHashCleanup(pCxt->pTableNameHashObj);
1062 1063

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1064 1065 1066
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1067 1068 1069 1070 1071 1072
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
1073 1074 1075 1076
  int32_t     tbNum = 0;
  char        tbFName[TSDB_TABLE_FNAME_LEN];
  bool        autoCreateTbl = false;
  STableMeta* pMeta = NULL;
X
Xiaoyu Wang 已提交
1077

X
Xiaoyu Wang 已提交
1078
  // for each table
1079 1080
  while (1) {
    SToken sToken;
X
Xiaoyu Wang 已提交
1081
    char*  tbName = NULL;
D
stmt  
dapan1121 已提交
1082

1083 1084 1085 1086 1087
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
stmt  
dapan1121 已提交
1088
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1089
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1090 1091 1092 1093
      }
      break;
    }

D
stmt  
dapan1121 已提交
1094
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
X
Xiaoyu Wang 已提交
1095
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
D
stmt  
dapan1121 已提交
1096 1097
    }

D
stmt  
dapan1121 已提交
1098 1099
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1100 1101 1102
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
X
Xiaoyu Wang 已提交
1103

D
stmt  
dapan1121 已提交
1104 1105 1106 1107 1108 1109
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
X
Xiaoyu Wang 已提交
1110

1111 1112 1113
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

D
dapan 已提交
1114 1115 1116 1117
    SName name;
    createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
    tNameExtractFullName(&name, tbFName);

X
Xiaoyu Wang 已提交
1118 1119
    CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));

H
refact  
Hongze Cheng 已提交
1120
    // USING cluase
1121
    if (TK_USING == sToken.type) {
D
dapan 已提交
1122
      CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
1123
      NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1124
      autoCreateTbl = true;
1125
    } else {
D
dapan1121 已提交
1126 1127 1128
      char dbFName[TSDB_DB_FNAME_LEN];
      tNameGetFullDbName(&name, dbFName);
      CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
1129 1130
    }

H
refact  
Hongze Cheng 已提交
1131
    STableDataBlocks* dataBuf = NULL;
D
dapan 已提交
1132
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1133 1134
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));
D
dapan1121 已提交
1135 1136
    pMeta = pCxt->pTableMeta;
    pCxt->pTableMeta = NULL;
1137

1138
    if (TK_NK_LP == sToken.type) {
1139
      // pSql -> field1_name, ...)
D
dapan1121 已提交
1140
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta)));
1141 1142 1143 1144 1145 1146
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1147
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1148 1149

      tbNum++;
1150 1151 1152 1153
      continue;
    }

    // FILE csv_file_path
X
Xiaoyu Wang 已提交
1154
    if (TK_FILE == sToken.type) {
1155 1156
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1157
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1158 1159 1160
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1161
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1162 1163

      tbNum++;
1164 1165 1166 1167 1168
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1169

D
stmt  
dapan1121 已提交
1170
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
X
Xiaoyu Wang 已提交
1171
    SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1172 1173 1174 1175
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
1176 1177
    (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj,
                                pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1178

D
dapan 已提交
1179
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1180 1181
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
D
dapan1121 已提交
1182
    pCxt->pTableMeta = NULL;
X
Xiaoyu Wang 已提交
1183

D
stmt  
dapan1121 已提交
1184 1185
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1186

1187
  // merge according to vgId
D
stmt  
dapan1121 已提交
1188
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1189
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1190
  }
1191
  return buildOutput(pCxt);
1192 1193 1194 1195 1196 1197 1198 1199
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
X
Xiaoyu Wang 已提交
1200
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
1201
  SInsertParseContext context = {
X
Xiaoyu Wang 已提交
1202 1203 1204 1205
      .pComCxt = pContext,
      .pSql = (char*)pContext->pSql,
      .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
      .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1206 1207
      .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
      .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
X
Xiaoyu Wang 已提交
1208 1209 1210
      .totalNum = 0,
      .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
      .pStmtCb = pContext->pStmtCb};
1211

D
stmt  
dapan1121 已提交
1212
  if (pContext->pStmtCb && *pQuery) {
X
Xiaoyu Wang 已提交
1213 1214
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
                                        &context.pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1215
  } else {
X
Xiaoyu Wang 已提交
1216 1217 1218
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
    context.pTableBlockHashObj =
        taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
D
stmt  
dapan1121 已提交
1219
  }
X
Xiaoyu Wang 已提交
1220 1221

  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
X
Xiaoyu Wang 已提交
1222
      NULL == context.pTableNameHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1223
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1224 1225
  }

D
stmt  
dapan1121 已提交
1226 1227 1228 1229
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1230
  if (NULL == *pQuery) {
D
stmt  
dapan1121 已提交
1231 1232 1233 1234
    *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
1235

D
stmt  
dapan1121 已提交
1236 1237 1238 1239
    (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
    (*pQuery)->haveResultSet = false;
    (*pQuery)->msgType = TDMT_VND_SUBMIT;
    (*pQuery)->pRoot = (SNode*)context.pOutput;
1240
  }
X
Xiaoyu Wang 已提交
1241

D
dapan1121 已提交
1242 1243 1244 1245 1246 1247
  if (NULL == (*pQuery)->pTableList) {
    (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
    if (NULL == (*pQuery)->pTableList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
1248

1249
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1250

1251 1252 1253 1254
  int32_t code = skipInsertInto(&context);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
1255
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
X
Xiaoyu Wang 已提交
1256 1257 1258 1259 1260 1261
    SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
    while (NULL != pTable) {
      taosArrayPush((*pQuery)->pTableList, pTable);
      pTable = taosHashIterate(context.pTableNameHashObj, pTable);
    }
  }
1262
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1263
  return code;
1264
}
D
stmt  
dapan1121 已提交
1265

X
Xiaoyu Wang 已提交
1266 1267 1268 1269
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
D
stmt  
dapan1121 已提交
1270
  int32_t code = 0;
X
Xiaoyu Wang 已提交
1271 1272
  char*   tbName = NULL;

D
stmt  
dapan1121 已提交
1273
  NEXT_TOKEN(pTableName, sToken);
X
Xiaoyu Wang 已提交
1274

D
stmt  
dapan1121 已提交
1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1286
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1287 1288 1289 1290 1291 1292 1293
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
1294 1295
  SVnodeModifOpStmt*  modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
  int32_t             code = 0;
D
stmt  
dapan1121 已提交
1296
  SInsertParseContext insertCtx = {
X
Xiaoyu Wang 已提交
1297 1298 1299
      .pVgroupsHashObj = pVgHash,
      .pTableBlockHashObj = pBlockHash,
      .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1300
  };
X
Xiaoyu Wang 已提交
1301

D
stmt  
dapan1121 已提交
1302 1303
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1304
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1305 1306 1307 1308
  }

  CHECK_CODE(buildOutput(&insertCtx));

wmmhello's avatar
wmmhello 已提交
1309
  destroyBlockArrayList(insertCtx.pVgDataBlocks);
D
stmt  
dapan1121 已提交
1310 1311 1312
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1313 1314 1315 1316
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
                           char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
stmt  
dapan1121 已提交
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  SKVRowBuilder tagBuilder;
  if (tdInitKVRowBuilder(&tagBuilder) < 0) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
1327
  SSchema* pSchema = pDataBlock->pTableMeta->schema;
D
stmt  
dapan1121 已提交
1328 1329 1330 1331 1332 1333 1334
  SKvParam param = {.builder = &tagBuilder};

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      KvRowAppend(&pBuf, NULL, 0, &param);
      continue;
    }
X
Xiaoyu Wang 已提交
1335 1336

    SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1];  // colId starts with 1
D
stmt  
dapan1121 已提交
1337 1338 1339 1340 1341 1342
    param.schema = pTagSchema;

    int32_t colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
X
Xiaoyu Wang 已提交
1343 1344

    CHECK_CODE(KvRowAppend(&pBuf, (char*)bind[c].buffer, colLen, &param));
D
stmt  
dapan1121 已提交
1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
  }

  SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
  if (NULL == row) {
    tdDestroyKVRowBuilder(&tagBuilder);
    return buildInvalidOperationMsg(&pBuf, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

  SVCreateTbReq tbReq = {0};
wmmhello's avatar
wmmhello 已提交
1355
  CHECK_CODE(buildCreateTbReq(&tbReq, tName, row, suid));
D
stmt  
dapan1121 已提交
1356 1357 1358 1359 1360 1361 1362 1363
  CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));

  destroyCreateSubTbReq(&tbReq);
  tdDestroyKVRowBuilder(&tagBuilder);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1364 1365 1366 1367
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1368 1369
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1370 1371 1372 1373
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
stmt  
dapan1121 已提交
1374 1375
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1376
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
X
Xiaoyu Wang 已提交
1377

D
stmt  
dapan1121 已提交
1378 1379 1380
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
X
Xiaoyu Wang 已提交
1381

D
stmt  
dapan1121 已提交
1382 1383
    for (int c = 0; c < spd->numOfBound; ++c) {
      SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
D
stmt  
dapan1121 已提交
1384 1385 1386 1387

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
X
Xiaoyu Wang 已提交
1388

D
stmt  
dapan1121 已提交
1389 1390 1391 1392
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1393 1394 1395
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
X
Xiaoyu Wang 已提交
1396

D
stmt  
dapan1121 已提交
1397 1398
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
D
dapan1121 已提交
1399 1400 1401 1402
        if (bind[c].buffer_type != pColSchema->type) {
          return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
        }

D
stmt  
dapan1121 已提交
1403 1404 1405 1406
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
X
Xiaoyu Wang 已提交
1407 1408

        CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1409
      }
X
Xiaoyu Wang 已提交
1410

D
stmt  
dapan1121 已提交
1411 1412
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1413
        checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
      }
    }
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
C
Cary Xu 已提交
1425 1426
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1427
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1428 1429
    taosMemoryFree(pSTSchema);
#endif
D
stmt  
dapan1121 已提交
1430 1431
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
1432

X
Xiaoyu Wang 已提交
1433
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1434 1435 1436 1437 1438 1439 1440
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1441 1442 1443 1444 1445
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1446 1447
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1448 1449 1450 1451
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  bool                rowStart = (0 == colIdx);
  bool                rowEnd = ((colIdx + 1) == spd->numOfBound);
D
stmt  
dapan1121 已提交
1452 1453 1454 1455 1456

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
X
Xiaoyu Wang 已提交
1457

D
stmt  
dapan1121 已提交
1458 1459 1460 1461 1462 1463 1464
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
D
stmt  
dapan1121 已提交
1465

D
stmt  
dapan1121 已提交
1466 1467 1468 1469 1470
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx] - 1];

    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
X
Xiaoyu Wang 已提交
1471

D
stmt  
dapan1121 已提交
1472 1473 1474 1475 1476 1477 1478
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
X
Xiaoyu Wang 已提交
1479

D
stmt  
dapan1121 已提交
1480 1481
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
1482 1483 1484 1485
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
1486 1487 1488 1489
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
X
Xiaoyu Wang 已提交
1490 1491

      CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1492
    }
X
Xiaoyu Wang 已提交
1493

D
stmt  
dapan1121 已提交
1494 1495
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1496
      checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1497
    }
X
Xiaoyu Wang 已提交
1498

D
stmt  
dapan1121 已提交
1499 1500 1501 1502 1503 1504 1505 1506
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
X
Xiaoyu Wang 已提交
1507
    }
C
Cary Xu 已提交
1508 1509

#ifdef TD_DEBUG_PRINT_ROW
X
Xiaoyu Wang 已提交
1510
    if (rowEnd) {
C
Cary Xu 已提交
1511
      STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1512
      tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1513 1514 1515
      taosMemoryFree(pSTSchema);
    }
#endif
D
stmt  
dapan1121 已提交
1516 1517
  }

D
stmt  
dapan1121 已提交
1518 1519 1520
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

X
Xiaoyu Wang 已提交
1521
    SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1522 1523 1524 1525 1526 1527 1528 1529
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1530
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
      SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1];
      strcpy((*fields)[i].name, pTagSchema->name);
      (*fields)[i].type = pTagSchema->type;
      (*fields)[i].bytes = pTagSchema->bytes;
    }
D
stmt  
dapan1121 已提交
1543 1544 1545 1546 1547 1548 1549
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1550 1551
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
1552 1553 1554 1555
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
X
Xiaoyu Wang 已提交
1556 1557

  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1558 1559 1560 1561 1562 1563 1564 1565
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1566

D
stmt  
dapan1121 已提交
1567 1568 1569
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1570 1571 1572
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*          pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1573 1574
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
1575 1576 1577
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
1578 1579 1580 1581 1582

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1583

D
stmt  
dapan1121 已提交
1584 1585 1586
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1587
// schemaless logic start
D
stmt  
dapan1121 已提交
1588

wmmhello's avatar
wmmhello 已提交
1589
typedef struct SmlExecTableHandle {
X
Xiaoyu Wang 已提交
1590 1591 1592
  SParsedDataColInfo tags;          // each table
  SKVRowBuilder      tagsBuilder;   // each table
  SVCreateTbReq      createTblReq;  // each table
wmmhello's avatar
wmmhello 已提交
1593
} SmlExecTableHandle;
wmmhello's avatar
wmmhello 已提交
1594

wmmhello's avatar
wmmhello 已提交
1595
typedef struct SmlExecHandle {
1596 1597 1598
  SHashObj*          pBlockHash;
  SmlExecTableHandle tableExecHandle;
  SQuery*            pQuery;
wmmhello's avatar
wmmhello 已提交
1599
} SSmlExecHandle;
wmmhello's avatar
wmmhello 已提交
1600

wmmhello's avatar
wmmhello 已提交
1601 1602 1603 1604 1605 1606 1607
static void smlDestroyTableHandle(void* pHandle) {
  SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
  tdDestroyKVRowBuilder(&handle->tagsBuilder);
  destroyBoundColumnInfo(&handle->tags);
  destroyCreateSubTbReq(&handle->createTblReq);
}

X
Xiaoyu Wang 已提交
1608
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
wmmhello's avatar
wmmhello 已提交
1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620
  col_id_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0;
  pColList->boundNullLen = 0;
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
  for (col_id_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  bool     isOrdered = true;
  col_id_t lastColIdx = -1;  // last column found
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
X
Xiaoyu Wang 已提交
1621 1622
    SSmlKv*  kv = taosArrayGetP(cols, i);
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
wmmhello's avatar
wmmhello 已提交
1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
    ++pColList->numOfBound;
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
1671
  if (pColList->numOfCols > pColList->numOfBound) {
wmmhello's avatar
wmmhello 已提交
1672 1673 1674 1675 1676 1677 1678
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1679 1680
static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema,
                              SKVRow* row, SMsgBuf* msg) {
wmmhello's avatar
wmmhello 已提交
1681 1682 1683 1684 1685 1686
  if (tdInitKVRowBuilder(tagsBuilder) < 0) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SKvParam param = {.builder = tagsBuilder};
  for (int i = 0; i < tags->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1687
    SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1];  // colId starts with 1
wmmhello's avatar
wmmhello 已提交
1688
    param.schema = pTagSchema;
X
Xiaoyu Wang 已提交
1689
    SSmlKv* kv = taosArrayGetP(cols, i);
1690
    if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
1691
      KvRowAppend(msg, kv->value, kv->length, &param);
1692
    } else {
wmmhello's avatar
wmmhello 已提交
1693 1694
      KvRowAppend(msg, &(kv->value), kv->length, &param);
    }
wmmhello's avatar
wmmhello 已提交
1695 1696
  }

wmmhello's avatar
wmmhello 已提交
1697
  *row = tdGetKVRowFromBuilder(tagsBuilder);
X
Xiaoyu Wang 已提交
1698
  if (*row == NULL) {
wmmhello's avatar
wmmhello 已提交
1699 1700
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1701
  tdSortKVRowByColIdx(*row);
wmmhello's avatar
wmmhello 已提交
1702 1703 1704
  return TSDB_CODE_SUCCESS;
}

1705 1706
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
                    char* tableName, char* msgBuf, int16_t msgBufLen) {
wmmhello's avatar
wmmhello 已提交
1707 1708
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

X
Xiaoyu Wang 已提交
1709
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
1710 1711
  smlDestroyTableHandle(&smlHandle->tableExecHandle);  // free for each table
  SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
wmmhello's avatar
wmmhello 已提交
1712 1713
  setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
  int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
X
Xiaoyu Wang 已提交
1714
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1715 1716 1717
    buildInvalidOperationMsg(&pBuf, "bound tags error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1718
  SKVRow row = NULL;
1719 1720
  ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema,
                       &row, &pBuf);
X
Xiaoyu Wang 已提交
1721
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1722 1723
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1724

wmmhello's avatar
wmmhello 已提交
1725
  buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, row, pTableMeta->suid);
wmmhello's avatar
wmmhello 已提交
1726

wmmhello's avatar
wmmhello 已提交
1727
  STableDataBlocks* pDataBlock = NULL;
X
Xiaoyu Wang 已提交
1728 1729
  ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
                             TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
wmmhello's avatar
wmmhello 已提交
1730
                             pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
X
Xiaoyu Wang 已提交
1731
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1732 1733 1734
    buildInvalidOperationMsg(&pBuf, "create data block error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1735 1736 1737

  SSchema* pSchema = getTableColumnSchema(pTableMeta);

1738
  ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
X
Xiaoyu Wang 已提交
1739
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1740 1741 1742
    buildInvalidOperationMsg(&pBuf, "bound cols error");
    return ret;
  }
X
Xiaoyu Wang 已提交
1743
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
wmmhello's avatar
wmmhello 已提交
1744 1745
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1746
  SMemParam           param = {.rb = pBuilder};
wmmhello's avatar
wmmhello 已提交
1747 1748 1749

  initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);

1750
  int32_t rowNum = taosArrayGetSize(cols);
1751
  if (rowNum <= 0) {
wmmhello's avatar
wmmhello 已提交
1752 1753
    return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
  }
wmmhello's avatar
wmmhello 已提交
1754
  ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
X
Xiaoyu Wang 已提交
1755
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1756 1757 1758
    buildInvalidOperationMsg(&pBuf, "allocate memory error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1759 1760 1761
  for (int32_t r = 0; r < rowNum; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
1762
    void*  rowData = taosArrayGetP(cols, r);
1763
    size_t rowDataSize = 0;
1764
    if (format) {
1765
      rowDataSize = taosArrayGetSize(rowData);
wmmhello's avatar
wmmhello 已提交
1766
    }
wmmhello's avatar
wmmhello 已提交
1767 1768

    // 1. set the parsed value from sql string
1769
    for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
wmmhello's avatar
wmmhello 已提交
1770 1771 1772 1773 1774
      SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];

      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

X
Xiaoyu Wang 已提交
1775 1776 1777
      SSmlKv* kv = NULL;
      if (format) {
        if (j < rowDataSize) {
1778
          kv = taosArrayGetP(rowData, j);
X
Xiaoyu Wang 已提交
1779 1780
          if (rowDataSize != spd->numOfBound &&
              (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
1781
            kv = NULL;
X
Xiaoyu Wang 已提交
1782
          } else {
1783
            j++;
1784
          }
wmmhello's avatar
wmmhello 已提交
1785
        }
X
Xiaoyu Wang 已提交
1786 1787 1788
      } else {
        void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
        if (p) kv = *p;
wmmhello's avatar
wmmhello 已提交
1789
      }
wmmhello's avatar
wmmhello 已提交
1790

1791
      if (!kv || kv->length == 0) {
wmmhello's avatar
wmmhello 已提交
1792 1793
        MemRowAppend(&pBuf, NULL, 0, &param);
      } else {
wmmhello's avatar
wmmhello 已提交
1794 1795
        int32_t colLen = kv->length;
        if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
1796
          kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
wmmhello's avatar
wmmhello 已提交
1797 1798
        }

1799
        if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
1800
          MemRowAppend(&pBuf, kv->value, colLen, &param);
1801
        } else {
wmmhello's avatar
wmmhello 已提交
1802 1803
          MemRowAppend(&pBuf, &(kv->value), colLen, &param);
        }
wmmhello's avatar
wmmhello 已提交
1804 1805 1806 1807
      }

      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1808
        checkTimestamp(pDataBlock, (const char*)&tsKey);
wmmhello's avatar
wmmhello 已提交
1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824
      }
    }

    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }

    pDataBlock->size += extendedRowSize;
  }

X
Xiaoyu Wang 已提交
1825
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
wmmhello's avatar
wmmhello 已提交
1826 1827 1828 1829 1830 1831 1832
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1833 1834 1835
void* smlInitHandle(SQuery* pQuery) {
  SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
  if (!handle) return NULL;
wmmhello's avatar
wmmhello 已提交
1836 1837 1838 1839 1840 1841
  handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
  handle->pQuery = pQuery;

  return handle;
}

X
Xiaoyu Wang 已提交
1842 1843 1844
void smlDestroyHandle(void* pHandle) {
  if (!pHandle) return;
  SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
wmmhello's avatar
wmmhello 已提交
1845
  destroyBlockHashmap(handle->pBlockHash);
wmmhello's avatar
wmmhello 已提交
1846
  smlDestroyTableHandle(&handle->tableExecHandle);
wmmhello's avatar
wmmhello 已提交
1847 1848 1849 1850
  taosMemoryFree(handle);
}

int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
1851
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
wmmhello's avatar
wmmhello 已提交
1852 1853
  return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
wmmhello's avatar
wmmhello 已提交
1854
// schemaless logic end