parInsert.c 62.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43 44
  } while (0)

typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
45 46 47 48 49 50 51 52 53 54 55 56
  SParseContext*     pComCxt;             // input
  char*              pSql;                // input
  SMsgBuf            msg;                 // input
  STableMeta*        pTableMeta;          // each table
  SParsedDataColInfo tags;                // each table
  SKVRowBuilder      tagsBuilder;         // each table
  SVCreateTbReq      createTblReq;        // each table
  SHashObj*          pVgroupsHashObj;     // global
  SHashObj*          pTableBlockHashObj;  // global
  SHashObj*          pSubTableHashObj;    // global
  SArray*            pVgDataBlocks;       // global
  int32_t            totalNum;
X
Xiaoyu Wang 已提交
57
  SVnodeModifOpStmt* pOutput;
X
Xiaoyu Wang 已提交
58
  SStmtCallback*     pStmtCb;
59 60
} SInsertParseContext;

H
refact  
Hongze Cheng 已提交
61
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
62 63 64 65

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
66
typedef struct SKvParam {
X
Xiaoyu Wang 已提交
67 68
  SKVRowBuilder* builder;
  SSchema*       schema;
D
stmt  
dapan1121 已提交
69 70 71 72 73 74 75 76 77 78
  char           buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

X
Xiaoyu Wang 已提交
79 80 81
#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
D
stmt  
dapan1121 已提交
82
    if (TSDB_CODE_SUCCESS != code) { \
X
Xiaoyu Wang 已提交
83 84
      return code;                   \
    }                                \
D
stmt  
dapan1121 已提交
85 86
  } while (0)

87 88 89 90 91 92 93 94 95 96 97 98 99
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INSERT != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
  }
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INTO != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
  }
  return TSDB_CODE_SUCCESS;
}

100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

162
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
163
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
164
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
165 166
  }

167
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
168
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
169
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
170
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
171 172
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
173
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
174 175
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
176

177 178 179
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
180
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
181 182 183 184
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";

H
refact  
Hongze Cheng 已提交
185 186
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
187

H
refact  
Hongze Cheng 已提交
188
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
189 190 191
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
192
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
193 194 195
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
196
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
197 198 199 200 201
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
H
refact  
Hongze Cheng 已提交
202
    char    tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
203
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
204
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
221
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
222 223 224
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
225
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

D
stmt  
dapan1121 已提交
240
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) {
H
Haojun Liao 已提交
241
  SParseContext* pBasicCtx = pCxt->pComCxt;
X
Xiaoyu Wang 已提交
242 243
  SName          name = {0};
  createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg);
D
stmt  
dapan1121 已提交
244
  if (isStb) {
H
refact  
Hongze Cheng 已提交
245 246
    CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
                                    &pCxt->pTableMeta));
D
stmt  
dapan1121 已提交
247
  } else {
H
refact  
Hongze Cheng 已提交
248 249
    CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
                                   &pCxt->pTableMeta));
X
Xiaoyu Wang 已提交
250 251 252 253
    SVgroupInfo vg;
    CHECK_CODE(
        catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
    CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
stmt  
dapan1121 已提交
254
  }
H
refact  
Hongze Cheng 已提交
255
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
256 257
}

H
refact  
Hongze Cheng 已提交
258
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, false); }
D
stmt  
dapan1121 已提交
259

H
refact  
Hongze Cheng 已提交
260
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, true); }
D
stmt  
dapan1121 已提交
261

262 263 264 265 266 267 268 269 270 271
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
272
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
273 274 275 276 277 278 279 280 281
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
282
    int32_t schemaLen = blk->schemaLen;
H
refact  
Hongze Cheng 已提交
283 284 285 286 287 288 289
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
290
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
H
refact  
Hongze Cheng 已提交
291
  }
292 293 294 295 296 297 298 299 300 301
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
302
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
303 304 305
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
306
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
307 308
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
wafwerar's avatar
wafwerar 已提交
309
    TSWAP(dst->pData, src->pData);
D
dapan1121 已提交
310
    buildMsgHeader(src, dst);
311 312 313 314 315
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
316
int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
317 318 319 320 321
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
322
  TSKEY k = *(TSKEY*)start;
323
  if (k <= pDataBlocks->prevTS) {
324 325 326 327 328 329 330
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
331 332 333 334 335 336
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
337 338

  if (pToken->type == TK_NOW) {
339
    ts = taosGetTimestamp(timePrec);
340 341
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
342
  } else if (pToken->type == TK_NK_INTEGER) {
343 344
    bool isSigned = false;
    toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
H
refact  
Hongze Cheng 已提交
345
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
346
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
347
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
348 349 350 351 352 353 354
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
355
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
356 357 358 359
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
360
    if (pToken->z[k] == ',') {
361 362
      *end = pTokenEnd;
      *time = ts;
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
378
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
379 380 381 382 383
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
384
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
385 386 387 388 389 390 391
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

392
    if (sToken.type == TK_NK_PLUS) {
393
      ts += interval;
394
    } else {
395
      ts = ts - interval;
396 397
    }

398
    *end = pTokenEnd;
399 400
  }

401
  *time = ts;
402 403
  return TSDB_CODE_SUCCESS;
}
404

X
Xiaoyu Wang 已提交
405
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
406 407 408 409
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
410
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
411 412 413 414
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
415
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
416 417 418 419
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

420
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
421
    pToken->z = tmpTokenBuf;
422
    pToken->n = len;
X
Xiaoyu Wang 已提交
423 424 425 426 427
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
428
static bool isNullStr(SToken* pToken) {
429
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
430 431 432
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
433
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
434 435 436 437 438
  errno = 0;
  *value = strtold(pToken->z, endPtr);

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
439
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
440 441 442 443 444
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
445 446
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
447
  int64_t iv;
H
refact  
Hongze Cheng 已提交
448
  char*   endptr = NULL;
X
Xiaoyu Wang 已提交
449 450 451 452 453 454 455 456 457
  bool    isSigned = false;

  int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
458
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
459 460
    }

X
Xiaoyu Wang 已提交
461
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
462 463 464 465
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
466
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
467
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
468
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
469
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
470
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
471 472 473
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
474
      } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
475
        return func(pMsgBuf, ((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
476
      } else if (pToken->type == TK_NK_FLOAT) {
X
Xiaoyu Wang 已提交
477
        return func(pMsgBuf, ((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
X
Xiaoyu Wang 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
491
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
492 493
    }

H
refact  
Hongze Cheng 已提交
494
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
495 496 497 498 499 500
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
      } else if (!IS_VALID_UTINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
501
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
502 503 504 505 506 507 508 509 510
    }

    case TSDB_DATA_TYPE_SMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
511
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
512 513 514 515 516 517 518 519 520
    }

    case TSDB_DATA_TYPE_USMALLINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
      } else if (!IS_VALID_USMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
      uint16_t tmpVal = (uint16_t)iv;
X
Xiaoyu Wang 已提交
521
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
522 523 524 525 526 527 528 529 530
    }

    case TSDB_DATA_TYPE_INT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
531
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
532 533 534 535 536 537 538 539 540
    }

    case TSDB_DATA_TYPE_UINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
      } else if (!IS_VALID_UINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
      uint32_t tmpVal = (uint32_t)iv;
X
Xiaoyu Wang 已提交
541
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
542 543 544 545 546 547 548 549
    }

    case TSDB_DATA_TYPE_BIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
550
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
551 552 553 554 555 556 557 558 559
    }

    case TSDB_DATA_TYPE_UBIGINT: {
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
      } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
      uint64_t tmpVal = (uint64_t)iv;
X
Xiaoyu Wang 已提交
560
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
561 562 563 564
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
565
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
566 567
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
568 569
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
570 571 572
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
573
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
574 575 576 577
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
578
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
579 580 581 582 583
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
584
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
585 586 587 588 589 590 591 592
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
        return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
      }

X
Xiaoyu Wang 已提交
593
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
594 595 596
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
597
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
598 599
    }

600
    case TSDB_DATA_TYPE_JSON: {
X
Xiaoyu Wang 已提交
601
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
602 603 604 605 606
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }

X
Xiaoyu Wang 已提交
607 608 609 610 611 612
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
613
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
614 615 616 617 618 619
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
620
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
621 622
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
623 624 625 626 627 628

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

629
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
630
    const char* rowEnd = tdRowEnd(rb->pBuf);
631
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
C
Cary Xu 已提交
632
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, true, pa->toffset, pa->colIdx);
633 634
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
635 636
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
637
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
638 639 640
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
641
    }
642
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
643
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
644
  } else {
645
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
646
  }
647

648 649
  return TSDB_CODE_SUCCESS;
}
650 651 652

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
653
  col_id_t nCols = pColList->numOfCols;
654

H
refact  
Hongze Cheng 已提交
655
  pColList->numOfBound = 0;
C
Cary Xu 已提交
656
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
657
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
658
  for (col_id_t i = 0; i < nCols; ++i) {
659 660 661
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
662 663
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
664
  col_id_t lastColIdx = -1;  // last column found
665 666 667
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

668
    if (TK_NK_RP == sToken.type) {
669 670 671
      break;
    }

C
Cary Xu 已提交
672 673
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
674 675 676 677 678 679 680 681 682 683 684 685
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z);
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
686
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
687
    ++pColList->numOfBound;
C
Cary Xu 已提交
688 689 690 691 692 693 694 695 696 697 698
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
699 700 701 702 703
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
704
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
705 706 707 708
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
709
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
710
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
711 712 713
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
714
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
715 716 717 718 719
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
720
  if (pColList->numOfCols > pColList->numOfBound) {
721 722 723
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
724 725 726 727

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
728 729
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
  SKvParam* pa = (SKvParam*)param;
730

731 732
  int8_t  type = pa->schema->type;
  int16_t colId = pa->schema->colId;
733

X
Xiaoyu Wang 已提交
734
  if (TSDB_DATA_TYPE_JSON == type) {
735 736 737 738
    return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
  }

  if (value == NULL) {  // it is a null data
X
Xiaoyu Wang 已提交
739 740
    // tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
    // pa->colIdx);
741 742 743
    return TSDB_CODE_SUCCESS;
  }

744 745
  if (TSDB_DATA_TYPE_BINARY == type) {
    STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
746
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
747 748 749
  } else if (TSDB_DATA_TYPE_NCHAR == type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
    int32_t output = 0;
wafwerar's avatar
wafwerar 已提交
750
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
751 752
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
753
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
754 755 756
    }

    varDataSetLen(pa->buf, output);
757
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
758
  } else {
759
    tdAddColToKVRow(pa->builder, colId, value, TYPE_BYTES[type]);
760 761 762 763 764
  }

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
765
static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const char* tname, SKVRow row, int64_t suid) {
D
stmt  
dapan1121 已提交
766
  pTbReq->type = TD_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
767
  pTbReq->name = strdup(tname);
H
Hongze Cheng 已提交
768 769
  pTbReq->ctb.suid = suid;
  pTbReq->ctb.pTag = row;
X
Xiaoyu Wang 已提交
770 771 772 773

  return TSDB_CODE_SUCCESS;
}

774
// pSql -> tag1_value, ...)
wmmhello's avatar
wmmhello 已提交
775
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
776
  if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
777 778 779
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

780
  SKvParam param = {.builder = &pCxt->tagsBuilder};
X
Xiaoyu Wang 已提交
781 782 783
  SToken   sToken;
  bool     isParseBindParam = false;
  char     tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
784
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
785
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
786 787 788 789 790 791 792 793 794 795 796 797 798

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
    }
X
Xiaoyu Wang 已提交
799 800

    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1];  // colId starts with 1
X
Xiaoyu Wang 已提交
801
    param.schema = pTagSchema;
H
refact  
Hongze Cheng 已提交
802 803
    CHECK_CODE(
        parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
804 805
  }

D
stmt  
dapan1121 已提交
806 807 808 809
  if (isParseBindParam) {
    return TSDB_CODE_SUCCESS;
  }

810
  SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
811 812 813 814 815
  if (NULL == row) {
    return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

wmmhello's avatar
wmmhello 已提交
816
  return buildCreateTbReq(&pCxt->createTblReq, tName, row, pCxt->pTableMeta->suid);
X
Xiaoyu Wang 已提交
817
}
818

X
Xiaoyu Wang 已提交
819 820 821 822 823 824 825 826
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
827

X
Xiaoyu Wang 已提交
828 829 830 831 832 833 834 835 836 837 838
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
                              int32_t len, STableMeta* pMeta) {
  SVgroupInfo    vg;
  SParseContext* pBasicCtx = pCxt->pComCxt;
  CHECK_CODE(
      catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));

  pMeta->uid = tGenIdPI64();
  pMeta->vgId = vg.vgId;

X
Xiaoyu Wang 已提交
839 840 841 842 843
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
844 845 846 847
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
X
Xiaoyu Wang 已提交
848
  SName name;
D
stmt  
dapan1121 已提交
849
  createSName(&name, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
X
Xiaoyu Wang 已提交
850 851
  char tbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(&name, tbFName);
H
refact  
Hongze Cheng 已提交
852
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
853 854 855 856
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
857

X
Xiaoyu Wang 已提交
858
  SToken sToken;
859 860
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
861
  CHECK_CODE(getSTableMeta(pCxt, &sToken));
862 863 864
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
X
Xiaoyu Wang 已提交
865
  CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, &name, tbFName, len, pCxt->pTableMeta));
866 867

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
868
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
869 870 871

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
872
  if (TK_NK_LP == sToken.type) {
873
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
874 875 876 877 878 879 880 881
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
882
  if (TK_NK_LP != sToken.type) {
883 884
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
wmmhello's avatar
wmmhello 已提交
885
  CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name.tname));
X
Xiaoyu Wang 已提交
886 887 888 889
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_RP != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
890 891 892 893

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
894 895
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow,
                       char* tmpTokenBuf) {
896
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
897 898 899 900
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
901

H
refact  
Hongze Cheng 已提交
902 903
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
904
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
905
  SToken    sToken = {0};
906 907
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
908
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
909
    SSchema* pSchema = &schema[spd->boundColumns[i] - 1];
D
stmt  
dapan1121 已提交
910 911 912 913 914 915 916 917 918 919 920 921 922

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
X
Xiaoyu Wang 已提交
923

924
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
925
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
926
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
927 928

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
929
      TSKEY tsKey = TD_ROW_KEY(row);
930
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
931 932 933 934
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
935
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
936
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
937
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
938 939 940
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
941 942 943
        }
      }
    }
D
stmt  
dapan1121 已提交
944 945

    *gotRow = true;
C
Cary Xu 已提交
946 947
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
C
Cary Xu 已提交
948
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
949 950
    taosMemoryFree(pSTSchema);
#endif
951 952
  }

C
Cary Xu 已提交
953
  // *len = pBuilder->extendedRowSize;
954 955 956 957
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
958
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
959
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
960
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
961
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
962 963

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
964
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
965 966
  SToken sToken;
  while (1) {
967 968
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
969
    if (TK_NK_LP != sToken.type) {
970 971
      break;
    }
972
    pCxt->pSql += index;
973 974 975 976 977 978 979 980

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
981 982 983
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
X
Xiaoyu Wang 已提交
984
      pDataBlock->size += extendedRowSize;  // len;
D
stmt  
dapan1121 已提交
985
    }
986 987

    NEXT_TOKEN(pCxt->pSql, sToken);
988
    if (TK_NK_RP != sToken.type) {
989 990 991
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
992 993 994
    if (gotRow) {
      (*numOfRows)++;
    }
995 996
  }

D
stmt  
dapan1121 已提交
997
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
X
Xiaoyu Wang 已提交
998
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
999 1000 1001 1002
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
1003
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
1004 1005 1006 1007
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
1008
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
1009

H
refact  
Hongze Cheng 已提交
1010
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1011
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1012 1013 1014 1015 1016 1017 1018 1019
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1020
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1021
  taosMemoryFreeClear(pReq->name);
H
Hongze Cheng 已提交
1022
  taosMemoryFreeClear(pReq->ctb.pTag);
X
Xiaoyu Wang 已提交
1023 1024
}

X
Xiaoyu Wang 已提交
1025
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1026
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1027 1028
  destroyBoundColumnInfo(&pCxt->tags);
  tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
X
Xiaoyu Wang 已提交
1029
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1030 1031
}

1032 1033 1034 1035 1036
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1037
  taosMemoryFreeClear(pDataBlock->pData);
1038 1039 1040
  if (!pDataBlock->cloned) {
    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  }
wafwerar's avatar
wafwerar 已提交
1041
  taosMemoryFreeClear(pDataBlock);
1042 1043
}

X
Xiaoyu Wang 已提交
1044 1045 1046
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1047
  taosHashCleanup(pCxt->pSubTableHashObj);
1048 1049

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1050 1051 1052
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1053 1054 1055 1056 1057 1058
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
D
stmt  
dapan1121 已提交
1059
  int32_t tbNum = 0;
X
Xiaoyu Wang 已提交
1060

X
Xiaoyu Wang 已提交
1061
  // for each table
1062 1063
  while (1) {
    SToken sToken;
X
Xiaoyu Wang 已提交
1064
    char*  tbName = NULL;
D
stmt  
dapan1121 已提交
1065

1066 1067 1068 1069 1070
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
stmt  
dapan1121 已提交
1071
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1072
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1073 1074 1075 1076
      }
      break;
    }

D
stmt  
dapan1121 已提交
1077
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
X
Xiaoyu Wang 已提交
1078
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
D
stmt  
dapan1121 已提交
1079 1080
    }

D
stmt  
dapan1121 已提交
1081 1082
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1083 1084 1085
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
X
Xiaoyu Wang 已提交
1086

D
stmt  
dapan1121 已提交
1087 1088 1089 1090 1091 1092
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
X
Xiaoyu Wang 已提交
1093

1094 1095 1096
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

H
refact  
Hongze Cheng 已提交
1097
    // USING cluase
1098 1099 1100 1101
    if (TK_USING == sToken.type) {
      CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
      NEXT_TOKEN(pCxt->pSql, sToken);
    } else {
1102
      CHECK_CODE(getTableMeta(pCxt, &tbnameToken));
1103 1104
    }

H
refact  
Hongze Cheng 已提交
1105
    STableDataBlocks* dataBuf = NULL;
1106
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1107 1108 1109
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));

1110
    if (TK_NK_LP == sToken.type) {
1111
      // pSql -> field1_name, ...)
1112
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
1113 1114 1115 1116 1117 1118
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1119
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1120 1121

      tbNum++;
1122 1123 1124 1125
      continue;
    }

    // FILE csv_file_path
X
Xiaoyu Wang 已提交
1126
    if (TK_FILE == sToken.type) {
1127 1128
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1129
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1130 1131 1132
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1133
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1134 1135

      tbNum++;
1136 1137 1138 1139 1140
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1141

D
stmt  
dapan1121 已提交
1142
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
X
Xiaoyu Wang 已提交
1143
    SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1144 1145 1146 1147
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1148
    (*pCxt->pStmtCb->setBindInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags);
D
stmt  
dapan1121 已提交
1149
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1150

D
stmt  
dapan1121 已提交
1151
    (*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1152 1153
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
X
Xiaoyu Wang 已提交
1154

D
stmt  
dapan1121 已提交
1155 1156
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1157

1158
  // merge according to vgId
D
stmt  
dapan1121 已提交
1159
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1160
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1161
  }
1162
  return buildOutput(pCxt);
1163 1164 1165 1166 1167 1168 1169 1170
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
X
Xiaoyu Wang 已提交
1171
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
1172
  SInsertParseContext context = {
X
Xiaoyu Wang 已提交
1173 1174 1175 1176 1177 1178 1179 1180
      .pComCxt = pContext,
      .pSql = (char*)pContext->pSql,
      .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
      .pTableMeta = NULL,
      .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
      .totalNum = 0,
      .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
      .pStmtCb = pContext->pStmtCb};
1181

D
stmt  
dapan1121 已提交
1182
  if (pContext->pStmtCb && *pQuery) {
X
Xiaoyu Wang 已提交
1183 1184
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
                                        &context.pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1185
  } else {
D
stmt  
dapan1121 已提交
1186 1187
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
    context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
D
stmt  
dapan1121 已提交
1188
  }
X
Xiaoyu Wang 已提交
1189 1190 1191

  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
      NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1192
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1193 1194
  }

D
stmt  
dapan1121 已提交
1195 1196 1197 1198
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1199
  if (NULL == *pQuery) {
D
stmt  
dapan1121 已提交
1200 1201 1202 1203 1204 1205 1206 1207
    *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
    (*pQuery)->haveResultSet = false;
    (*pQuery)->msgType = TDMT_VND_SUBMIT;
    (*pQuery)->pRoot = (SNode*)context.pOutput;
1208
  }
X
Xiaoyu Wang 已提交
1209

1210
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1211

1212 1213 1214 1215 1216
  int32_t code = skipInsertInto(&context);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1217
  return code;
1218
}
D
stmt  
dapan1121 已提交
1219

X
Xiaoyu Wang 已提交
1220 1221 1222 1223
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
D
stmt  
dapan1121 已提交
1224
  int32_t code = 0;
X
Xiaoyu Wang 已提交
1225 1226
  char*   tbName = NULL;

D
stmt  
dapan1121 已提交
1227
  NEXT_TOKEN(pTableName, sToken);
X
Xiaoyu Wang 已提交
1228

D
stmt  
dapan1121 已提交
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1240
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1241 1242 1243 1244 1245 1246 1247
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
1248 1249
  SVnodeModifOpStmt*  modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
  int32_t             code = 0;
D
stmt  
dapan1121 已提交
1250
  SInsertParseContext insertCtx = {
X
Xiaoyu Wang 已提交
1251 1252 1253
      .pVgroupsHashObj = pVgHash,
      .pTableBlockHashObj = pBlockHash,
      .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1254
  };
X
Xiaoyu Wang 已提交
1255

D
stmt  
dapan1121 已提交
1256 1257
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1258
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1259 1260 1261 1262 1263 1264 1265
  }

  CHECK_CODE(buildOutput(&insertCtx));

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1266
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tName, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen){
D
stmt  
dapan1121 已提交
1267
  STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
wmmhello's avatar
wmmhello 已提交
1268
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
D
stmt  
dapan1121 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  SKVRowBuilder tagBuilder;
  if (tdInitKVRowBuilder(&tagBuilder) < 0) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
  SKvParam param = {.builder = &tagBuilder};

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      KvRowAppend(&pBuf, NULL, 0, &param);
      continue;
    }
X
Xiaoyu Wang 已提交
1287 1288

    SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1];  // colId starts with 1
D
stmt  
dapan1121 已提交
1289 1290 1291 1292 1293 1294
    param.schema = pTagSchema;

    int32_t colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
X
Xiaoyu Wang 已提交
1295 1296

    CHECK_CODE(KvRowAppend(&pBuf, (char*)bind[c].buffer, colLen, &param));
D
stmt  
dapan1121 已提交
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
  }

  SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
  if (NULL == row) {
    tdDestroyKVRowBuilder(&tagBuilder);
    return buildInvalidOperationMsg(&pBuf, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

  SVCreateTbReq tbReq = {0};
wmmhello's avatar
wmmhello 已提交
1307
  CHECK_CODE(buildCreateTbReq(&tbReq, tName, row, suid));
D
stmt  
dapan1121 已提交
1308 1309 1310 1311 1312 1313 1314 1315
  CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));

  destroyCreateSubTbReq(&tbReq);
  tdDestroyKVRowBuilder(&tagBuilder);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1316 1317 1318 1319
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1320 1321
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1322 1323 1324 1325
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
stmt  
dapan1121 已提交
1326 1327
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1328
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
X
Xiaoyu Wang 已提交
1329

D
stmt  
dapan1121 已提交
1330 1331 1332
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
X
Xiaoyu Wang 已提交
1333

D
stmt  
dapan1121 已提交
1334 1335
    for (int c = 0; c < spd->numOfBound; ++c) {
      SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
D
stmt  
dapan1121 已提交
1336 1337 1338 1339

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
X
Xiaoyu Wang 已提交
1340

D
stmt  
dapan1121 已提交
1341 1342 1343 1344
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1345 1346 1347
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
X
Xiaoyu Wang 已提交
1348

D
stmt  
dapan1121 已提交
1349 1350
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
D
dapan1121 已提交
1351 1352 1353 1354
        if (bind[c].buffer_type != pColSchema->type) {
          return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
        }

D
stmt  
dapan1121 已提交
1355 1356 1357 1358
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
X
Xiaoyu Wang 已提交
1359 1360

        CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1361
      }
X
Xiaoyu Wang 已提交
1362

D
stmt  
dapan1121 已提交
1363 1364
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1365
        checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
      }
    }
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
C
Cary Xu 已提交
1377 1378
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1379
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1380 1381
    taosMemoryFree(pSTSchema);
#endif
X
Xiaoyu Wang 已提交
1382

D
stmt  
dapan1121 已提交
1383 1384
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
1385

X
Xiaoyu Wang 已提交
1386
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1387 1388 1389 1390 1391 1392 1393
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1394 1395 1396 1397 1398
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1399 1400
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1401 1402 1403 1404
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  bool                rowStart = (0 == colIdx);
  bool                rowEnd = ((colIdx + 1) == spd->numOfBound);
D
stmt  
dapan1121 已提交
1405 1406 1407 1408 1409

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
X
Xiaoyu Wang 已提交
1410

D
stmt  
dapan1121 已提交
1411 1412 1413 1414 1415 1416 1417
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
D
stmt  
dapan1121 已提交
1418

D
stmt  
dapan1121 已提交
1419 1420 1421 1422 1423
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx] - 1];

    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
X
Xiaoyu Wang 已提交
1424

D
stmt  
dapan1121 已提交
1425 1426 1427 1428 1429 1430 1431
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
X
Xiaoyu Wang 已提交
1432

D
stmt  
dapan1121 已提交
1433 1434
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
1435 1436 1437 1438
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
1439 1440 1441 1442
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
X
Xiaoyu Wang 已提交
1443 1444

      CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1445
    }
X
Xiaoyu Wang 已提交
1446

D
stmt  
dapan1121 已提交
1447 1448
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1449
      checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1450
    }
X
Xiaoyu Wang 已提交
1451

D
stmt  
dapan1121 已提交
1452 1453 1454 1455 1456 1457 1458 1459
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
X
Xiaoyu Wang 已提交
1460
    }
C
Cary Xu 已提交
1461 1462

#ifdef TD_DEBUG_PRINT_ROW
X
Xiaoyu Wang 已提交
1463
    if (rowEnd) {
C
Cary Xu 已提交
1464
      STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1465
      tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1466 1467 1468
      taosMemoryFree(pSTSchema);
    }
#endif
D
stmt  
dapan1121 已提交
1469 1470
  }

D
stmt  
dapan1121 已提交
1471 1472 1473
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

X
Xiaoyu Wang 已提交
1474
    SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1475 1476 1477 1478 1479 1480 1481 1482
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1483
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
      SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1];
      strcpy((*fields)[i].name, pTagSchema->name);
      (*fields)[i].type = pTagSchema->type;
      (*fields)[i].bytes = pTagSchema->bytes;
    }
D
stmt  
dapan1121 已提交
1496 1497 1498 1499 1500 1501 1502
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1503 1504
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
1505 1506 1507 1508
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
X
Xiaoyu Wang 已提交
1509 1510

  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1511 1512 1513 1514 1515 1516 1517 1518
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1519

D
stmt  
dapan1121 已提交
1520 1521 1522
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1523 1524 1525
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*          pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1526 1527
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
1528 1529 1530
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
1531 1532 1533 1534 1535

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1536

D
stmt  
dapan1121 已提交
1537 1538 1539
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1540
// schemaless logic start
D
stmt  
dapan1121 已提交
1541

wmmhello's avatar
wmmhello 已提交
1542 1543 1544 1545 1546 1547 1548 1549
typedef struct SmlExecHandle {
  SHashObj*    pBlockHash;

  SParsedDataColInfo tags;      // each table
  SKVRowBuilder tagsBuilder;    // each table
  SVCreateTbReq createTblReq;   // each table

  SQuery* pQuery;
wmmhello's avatar
wmmhello 已提交
1550
} SSmlExecHandle;
wmmhello's avatar
wmmhello 已提交
1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565

static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
  col_id_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0;
  pColList->boundNullLen = 0;
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
  for (col_id_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  bool     isOrdered = true;
  col_id_t lastColIdx = -1;  // last column found
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1566
    SToken sToken = {.n=kv->keyLen, .z=(char*)kv->key};
wmmhello's avatar
wmmhello 已提交
1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
    ++pColList->numOfBound;
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

  if(pColList->numOfCols > pColList->numOfBound){
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1623
static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SKVRow *row, SMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1624 1625 1626 1627 1628 1629 1630 1631 1632
  if (tdInitKVRowBuilder(tagsBuilder) < 0) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SKvParam param = {.builder = tagsBuilder};
  for (int i = 0; i < tags->numOfBound; ++i) {
    SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1
    param.schema = pTagSchema;
    SSmlKv *kv = taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1633
    KvRowAppend(msg, kv->value, kv->valueLen, &param) ;
wmmhello's avatar
wmmhello 已提交
1634 1635 1636
  }


wmmhello's avatar
wmmhello 已提交
1637 1638
  *row = tdGetKVRowFromBuilder(tagsBuilder);
  if(*row == NULL){
wmmhello's avatar
wmmhello 已提交
1639 1640
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1641
  tdSortKVRowByColIdx(*row);
wmmhello's avatar
wmmhello 已提交
1642 1643 1644
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1645
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format,
1646
                    STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
wmmhello's avatar
wmmhello 已提交
1647 1648
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

wmmhello's avatar
wmmhello 已提交
1649
  SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle;
wmmhello's avatar
wmmhello 已提交
1650
  SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
wmmhello's avatar
wmmhello 已提交
1651 1652 1653 1654 1655 1656
  setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta));
  int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema);
  if(ret != TSDB_CODE_SUCCESS){
    buildInvalidOperationMsg(&pBuf, "bound tags error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1657 1658
  SKVRow row = NULL;
  ret = smlBoundTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf);
wmmhello's avatar
wmmhello 已提交
1659 1660 1661
  if(ret != TSDB_CODE_SUCCESS){
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1662

1663
  buildCreateTbReq(&smlHandle->createTblReq, tableName, row, pTableMeta->suid);
wmmhello's avatar
wmmhello 已提交
1664

wmmhello's avatar
wmmhello 已提交
1665
  STableDataBlocks* pDataBlock = NULL;
wmmhello's avatar
wmmhello 已提交
1666
  ret = getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
wmmhello's avatar
wmmhello 已提交
1667 1668
                                  sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta,
                                  &pDataBlock, NULL, &smlHandle->createTblReq);
wmmhello's avatar
wmmhello 已提交
1669 1670 1671 1672
  if(ret != TSDB_CODE_SUCCESS){
    buildInvalidOperationMsg(&pBuf, "create data block error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1673 1674 1675

  SSchema* pSchema = getTableColumnSchema(pTableMeta);

wmmhello's avatar
wmmhello 已提交
1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690

  if(format){
    ret = smlBoundColumns(taosArrayGetP(colsFormat, 0), &pDataBlock->boundColumnInfo, pSchema);
  }else{
    SArray *columns = taosArrayInit(16, POINTER_BYTES);
    void **p1 = taosHashIterate(colsHash, NULL);
    while (p1) {
      SSmlKv* kv = *p1;
      taosArrayPush(columns, &kv);
      p1 = taosHashIterate(colsHash, p1);
    }
    ret = smlBoundColumns(columns, &pDataBlock->boundColumnInfo, pSchema);
    taosArrayDestroy(columns);
  }

wmmhello's avatar
wmmhello 已提交
1691 1692 1693 1694
  if(ret != TSDB_CODE_SUCCESS){
    buildInvalidOperationMsg(&pBuf, "bound cols error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1695 1696 1697 1698 1699 1700 1701
  int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
  SMemParam param = {.rb = pBuilder};

  initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);

wmmhello's avatar
wmmhello 已提交
1702 1703 1704 1705
  int32_t rowNum = format ? taosArrayGetSize(colsFormat) : taosArrayGetSize(cols);
  if(rowNum <= 0) {
    return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
  }
wmmhello's avatar
wmmhello 已提交
1706 1707 1708 1709 1710
  ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
  if(ret != TSDB_CODE_SUCCESS){
    buildInvalidOperationMsg(&pBuf, "allocate memory error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1711 1712 1713
  for (int32_t r = 0; r < rowNum; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
wmmhello's avatar
wmmhello 已提交
1714 1715 1716 1717 1718 1719
    void *rowData = NULL;
    if(format){
      rowData = taosArrayGetP(colsFormat, r);
    }else{
      rowData = taosArrayGetP(cols, r);
    }
wmmhello's avatar
wmmhello 已提交
1720 1721 1722 1723 1724 1725 1726 1727

    // 1. set the parsed value from sql string
    for (int c = 0; c < spd->numOfBound; ++c) {
      SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];

      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

wmmhello's avatar
wmmhello 已提交
1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739
      SSmlKv *kv = NULL;
      if(format){
        kv = taosArrayGetP(rowData, c);
        if (!kv){
          char msg[64] = {0};
          sprintf(msg, "cols num not the same like before:%d", r);
          return buildInvalidOperationMsg(&pBuf, msg);
        }
      }else{
        void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
        kv = *p;
      }
wmmhello's avatar
wmmhello 已提交
1740

wmmhello's avatar
wmmhello 已提交
1741
      if (kv->length == 0) {
wmmhello's avatar
wmmhello 已提交
1742 1743 1744 1745
        MemRowAppend(&pBuf, NULL, 0, &param);
      } else {
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
wmmhello's avatar
wmmhello 已提交
1746
          colLen = kv->length;
wmmhello's avatar
wmmhello 已提交
1747 1748
        }

wmmhello's avatar
wmmhello 已提交
1749
        MemRowAppend(&pBuf, &(kv->value), colLen, &param);
wmmhello's avatar
wmmhello 已提交
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778
      }

      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
        checkTimestamp(pDataBlock, (const char *)&tsKey);
      }
    }

    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }

    pDataBlock->size += extendedRowSize;
  }

  SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData);
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1779 1780 1781
void* smlInitHandle(SQuery *pQuery){
  SSmlExecHandle *handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
  if(!handle) return NULL;
wmmhello's avatar
wmmhello 已提交
1782 1783 1784 1785 1786 1787
  handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
  handle->pQuery = pQuery;

  return handle;
}

wmmhello's avatar
wmmhello 已提交
1788
void smlDestroyHandle(void *pHandle){
wmmhello's avatar
wmmhello 已提交
1789
  if(!pHandle) return;
wmmhello's avatar
wmmhello 已提交
1790 1791
  SSmlExecHandle *handle = (SSmlExecHandle *)pHandle;
  destroyBlockHashmap(handle->pBlockHash);
wmmhello's avatar
wmmhello 已提交
1792 1793 1794 1795
  taosMemoryFree(handle);
}

int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
wmmhello's avatar
wmmhello 已提交
1796
  SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle;
wmmhello's avatar
wmmhello 已提交
1797 1798
  return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
wmmhello's avatar
wmmhello 已提交
1799
// schemaless logic end
D
stmt  
dapan1121 已提交
1800