parInsert.c 70.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43
  } while (0)

44 45 46 47 48 49 50
#define NEXT_VALID_TOKEN(pSql, sToken)        \
  do {                                        \
    sToken.n = tGetToken(pSql, &sToken.type); \
    sToken.z = pSql;                          \
    pSql += sToken.n;                         \
  } while (TK_NK_SPACE == sToken.type)

51
typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
52 53 54 55 56
  SParseContext*     pComCxt;             // input
  char*              pSql;                // input
  SMsgBuf            msg;                 // input
  STableMeta*        pTableMeta;          // each table
  SParsedDataColInfo tags;                // each table
C
Cary Xu 已提交
57
  SArray*            pTagVals;            // each table
X
Xiaoyu Wang 已提交
58 59 60 61 62
  SVCreateTbReq      createTblReq;        // each table
  SHashObj*          pVgroupsHashObj;     // global
  SHashObj*          pTableBlockHashObj;  // global
  SHashObj*          pSubTableHashObj;    // global
  SArray*            pVgDataBlocks;       // global
X
Xiaoyu Wang 已提交
63
  SHashObj*          pTableNameHashObj;   // global
X
Xiaoyu Wang 已提交
64
  int32_t            totalNum;
X
Xiaoyu Wang 已提交
65
  SVnodeModifOpStmt* pOutput;
X
Xiaoyu Wang 已提交
66
  SStmtCallback*     pStmtCb;
67
  SParseMetaCache*   pMetaCache;
68 69
} SInsertParseContext;

H
refact  
Hongze Cheng 已提交
70
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
71 72 73 74

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
75
typedef struct SKvParam {
C
Cary Xu 已提交
76
  int16_t  pos;
C
Cary Xu 已提交
77
  SArray*  pTagVals;
C
Cary Xu 已提交
78 79
  SSchema* schema;
  char     buf[TSDB_MAX_TAGS_LEN];
D
stmt  
dapan1121 已提交
80 81 82 83 84 85 86 87 88
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

X
Xiaoyu Wang 已提交
89 90 91
#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
D
stmt  
dapan1121 已提交
92
    if (TSDB_CODE_SUCCESS != code) { \
X
Xiaoyu Wang 已提交
93 94
      return code;                   \
    }                                \
D
stmt  
dapan1121 已提交
95 96
  } while (0)

97
static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
98
  SToken sToken;
99
  NEXT_TOKEN(*pSql, sToken);
100
  if (TK_INSERT != sToken.type) {
101
    return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
102
  }
103
  NEXT_TOKEN(*pSql, sToken);
104
  if (TK_INTO != sToken.type) {
105
    return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", sToken.z);
106 107 108 109
  }
  return TSDB_CODE_SUCCESS;
}

110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

172
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
173
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
174
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
175 176
  }

177
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
178
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
179
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
180
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
181 182
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
183
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
184 185
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
186

187 188 189
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
190
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
191 192 193
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";
194
  const char* msg4 = "invalid table name";
X
Xiaoyu Wang 已提交
195

H
refact  
Hongze Cheng 已提交
196 197
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
198

H
refact  
Hongze Cheng 已提交
199
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
200 201 202
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
203
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
204 205 206
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
207
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
208 209 210 211 212
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
213 214 215 216
    if (tbLen <= 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg4);
    }

C
Cary Xu 已提交
217
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
218
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
219
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
236
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
237 238 239
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
240
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
  if (NULL != pCxt->pMetaCache) {
    return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
  }
  return catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, pDbFname,
                        AUTH_TYPE_WRITE, pPass);
}

static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
  SParseContext* pBasicCtx = pCxt->pComCxt;
  if (NULL != pCxt->pMetaCache) {
    return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
  }
  if (isStb) {
    return catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName,
                                pTableMeta);
  }
  return catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pTableMeta);
}

static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
H
Haojun Liao 已提交
277
  SParseContext* pBasicCtx = pCxt->pComCxt;
278 279 280 281 282
  if (NULL != pCxt->pMetaCache) {
    return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
  }
  return catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pVg);
}
D
dapan 已提交
283

284
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
D
dapan 已提交
285
  bool pass = false;
286
  CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
D
dapan 已提交
287 288 289
  if (!pass) {
    return TSDB_CODE_PAR_PERMISSION_DENIED;
  }
290 291 292

  CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
  if (!isStb) {
X
Xiaoyu Wang 已提交
293
    SVgroupInfo vg;
294
    CHECK_CODE(getTableVgroup(pCxt, name, &vg));
X
Xiaoyu Wang 已提交
295
    CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
stmt  
dapan1121 已提交
296
  }
H
refact  
Hongze Cheng 已提交
297
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
298 299
}

X
Xiaoyu Wang 已提交
300 301 302
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, false);
}
D
stmt  
dapan1121 已提交
303

X
Xiaoyu Wang 已提交
304 305 306
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, true);
}
D
stmt  
dapan1121 已提交
307

308 309 310 311 312 313 314 315 316 317
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
318
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
319 320 321 322 323 324 325 326 327
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
328
    int32_t schemaLen = blk->schemaLen;
H
refact  
Hongze Cheng 已提交
329 330 331 332 333 334 335
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
336
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
H
refact  
Hongze Cheng 已提交
337
  }
338 339 340 341 342 343 344 345 346 347
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
348
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
349 350 351
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
352
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
353 354
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
wafwerar's avatar
wafwerar 已提交
355
    TSWAP(dst->pData, src->pData);
D
dapan1121 已提交
356
    buildMsgHeader(src, dst);
357 358 359 360 361
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
362
int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
363 364 365 366 367
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
368
  TSKEY k = *(TSKEY*)start;
369
  if (k <= pDataBlocks->prevTS) {
370 371 372 373 374 375 376
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
377 378 379 380 381 382
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
383 384

  if (pToken->type == TK_NOW) {
385
    ts = taosGetTimestamp(timePrec);
386 387
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
388
  } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
389
    toInteger(pToken->z, pToken->n, 10, &ts);
H
refact  
Hongze Cheng 已提交
390
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
391
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
392
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
393 394 395 396 397 398 399
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
400
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
401 402 403 404
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
405
    if (pToken->z[k] == ',') {
406 407
      *end = pTokenEnd;
      *time = ts;
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
423
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
424 425 426 427 428
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
429
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
430 431 432 433 434 435 436
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

437
    if (sToken.type == TK_NK_PLUS) {
438
      ts += interval;
439
    } else {
440
      ts = ts - interval;
441 442
    }

443
    *end = pTokenEnd;
444 445
  }

446
  *time = ts;
447 448
  return TSDB_CODE_SUCCESS;
}
449

X
Xiaoyu Wang 已提交
450
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
451 452 453 454
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
455
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
456 457 458 459
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
460
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
461 462 463 464
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

465
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
466
    pToken->z = tmpTokenBuf;
467
    pToken->n = len;
X
Xiaoyu Wang 已提交
468 469 470 471 472
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
473
static bool isNullStr(SToken* pToken) {
474
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
475 476 477
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
478
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
479
  errno = 0;
wafwerar's avatar
wafwerar 已提交
480
  *value = taosStr2Double(pToken->z, endPtr);
X
Xiaoyu Wang 已提交
481 482 483

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
484
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
485 486 487 488 489
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
490 491
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
492 493 494
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;
X
Xiaoyu Wang 已提交
495 496 497 498 499 500 501 502

  int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
503
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
504 505
    }

X
Xiaoyu Wang 已提交
506
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
507 508 509 510
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
511
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
512
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
513
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
514
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
515
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
516 517 518
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
519
      } else if (pToken->type == TK_NK_INTEGER) {
520 521
        return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
522
      } else if (pToken->type == TK_NK_FLOAT) {
523 524
        return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
X
Xiaoyu Wang 已提交
525 526 527 528 529 530
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
X
Xiaoyu Wang 已提交
531
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
532 533 534 535 536 537
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
538
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
539 540
    }

H
refact  
Hongze Cheng 已提交
541
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
542
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
543
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
X
Xiaoyu Wang 已提交
544
      } else if (!IS_VALID_UTINYINT(uv)) {
X
Xiaoyu Wang 已提交
545 546
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
547
      uint8_t tmpVal = (uint8_t)uv;
X
Xiaoyu Wang 已提交
548
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
549 550 551
    }

    case TSDB_DATA_TYPE_SMALLINT: {
X
Xiaoyu Wang 已提交
552
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
553 554 555 556 557
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
558
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
559 560 561
    }

    case TSDB_DATA_TYPE_USMALLINT: {
X
Xiaoyu Wang 已提交
562
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
563
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
X
Xiaoyu Wang 已提交
564
      } else if (!IS_VALID_USMALLINT(uv)) {
X
Xiaoyu Wang 已提交
565 566
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
567
      uint16_t tmpVal = (uint16_t)uv;
X
Xiaoyu Wang 已提交
568
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
569 570 571
    }

    case TSDB_DATA_TYPE_INT: {
X
Xiaoyu Wang 已提交
572
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
573 574 575 576 577
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
578
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
579 580 581
    }

    case TSDB_DATA_TYPE_UINT: {
X
Xiaoyu Wang 已提交
582
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
583
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
X
Xiaoyu Wang 已提交
584
      } else if (!IS_VALID_UINT(uv)) {
X
Xiaoyu Wang 已提交
585 586
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
587
      uint32_t tmpVal = (uint32_t)uv;
X
Xiaoyu Wang 已提交
588
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
589 590 591
    }

    case TSDB_DATA_TYPE_BIGINT: {
X
Xiaoyu Wang 已提交
592
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
593 594 595 596
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
597
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
598 599 600
    }

    case TSDB_DATA_TYPE_UBIGINT: {
X
Xiaoyu Wang 已提交
601
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
602
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
X
Xiaoyu Wang 已提交
603
      } else if (!IS_VALID_UBIGINT(uv)) {
X
Xiaoyu Wang 已提交
604 605
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
606
      return func(pMsgBuf, &uv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
607 608 609 610
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
611
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
612 613
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
614 615
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
616 617 618
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
619
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
620 621 622 623
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
624
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
625 626 627 628 629
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
630
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
631 632 633 634 635
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
D
dapan1121 已提交
636
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
X
Xiaoyu Wang 已提交
637 638
      }

X
Xiaoyu Wang 已提交
639
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
640 641 642
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
643
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
644
    }
C
Cary Xu 已提交
645
#ifdef JSON_TAG_REFACTOR
646
    case TSDB_DATA_TYPE_JSON: {
X
Xiaoyu Wang 已提交
647
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
648 649 650 651
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }
C
Cary Xu 已提交
652
#endif
X
Xiaoyu Wang 已提交
653 654 655 656 657 658
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
659
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
660 661 662 663 664 665
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
666
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
667 668
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
669 670 671 672 673 674

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

675
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
676
    const char* rowEnd = tdRowEnd(rb->pBuf);
677
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
678
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
679 680
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
681 682
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
683
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
D
dapan1121 已提交
684 685 686
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
X
Xiaoyu Wang 已提交
687 688 689
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
690
    }
691
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
692
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
693
  } else {
694
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
695
  }
696

697 698
  return TSDB_CODE_SUCCESS;
}
699 700 701

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
702
  col_id_t nCols = pColList->numOfCols;
703

H
refact  
Hongze Cheng 已提交
704
  pColList->numOfBound = 0;
C
Cary Xu 已提交
705
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
706
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
707
  for (col_id_t i = 0; i < nCols; ++i) {
708 709 710
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
711 712
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
713
  col_id_t lastColIdx = -1;  // last column found
714 715 716
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

717
    if (TK_NK_RP == sToken.type) {
718 719 720
      break;
    }

C
Cary Xu 已提交
721 722
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
723 724 725 726 727
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
728
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
729 730 731 732 733 734
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
735
    pColList->boundColumns[pColList->numOfBound] = index;
736
    ++pColList->numOfBound;
C
Cary Xu 已提交
737 738 739 740 741 742 743 744 745 746 747
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
748 749 750 751 752
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
753
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
754 755 756 757
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
758
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
759
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
760 761 762
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
763
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
764 765 766 767 768
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
769
  if (pColList->numOfCols > pColList->numOfBound) {
770 771 772
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
773 774 775 776

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
777 778
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
  SKvParam* pa = (SKvParam*)param;
779

780 781
  int8_t  type = pa->schema->type;
  int16_t colId = pa->schema->colId;
782

C
Cary Xu 已提交
783
#ifdef JSON_TAG_REFACTOR
X
Xiaoyu Wang 已提交
784
  if (TSDB_DATA_TYPE_JSON == type) {
785 786
    return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
  }
C
Cary Xu 已提交
787
#endif
788 789

  if (value == NULL) {  // it is a null data
X
Xiaoyu Wang 已提交
790 791
    // tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
    // pa->colIdx);
792 793 794
    return TSDB_CODE_SUCCESS;
  }

795
  if (TSDB_DATA_TYPE_BINARY == type) {
C
Cary Xu 已提交
796
    memcpy(pa->buf + pa->pos, value, len);
C
Cary Xu 已提交
797
    tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), len, false);
C
Cary Xu 已提交
798
    pa->pos += len;
799 800
  } else if (TSDB_DATA_TYPE_NCHAR == type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
801 802 803

    ASSERT((pa->pos + pa->schema->bytes - VARSTR_HEADER_SIZE) <= TSDB_MAX_TAGS_LEN);

804
    int32_t output = 0;
C
Cary Xu 已提交
805
    if (!taosMbsToUcs4(value, len, (TdUcs4*)(pa->buf + pa->pos), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
D
dapan1121 已提交
806 807 808
      if (errno == E2BIG) {
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
      }
X
Xiaoyu Wang 已提交
809
      char buf[512] = {0};
810
      snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
811
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
812
    }
C
Cary Xu 已提交
813
    tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), output, false);
C
Cary Xu 已提交
814
    pa->pos += output;
815
  } else {
C
Cary Xu 已提交
816
    memcpy(pa->buf + pa->pos, value, TYPE_BYTES[type]);
C
Cary Xu 已提交
817
    tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), TYPE_BYTES[type], false);
C
Cary Xu 已提交
818
    pa->pos + TYPE_BYTES[type];
819
  }
C
Cary Xu 已提交
820
  ASSERT(pa->pos <= TSDB_MAX_TAGS_LEN);
821 822 823 824

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
825
static int32_t buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) {
D
stmt  
dapan1121 已提交
826
  pTbReq->type = TD_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
827
  pTbReq->name = strdup(tname);
H
Hongze Cheng 已提交
828
  pTbReq->ctb.suid = suid;
C
Cary Xu 已提交
829
  pTbReq->ctb.pTag = (uint8_t*)pTag;
X
Xiaoyu Wang 已提交
830 831 832 833

  return TSDB_CODE_SUCCESS;
}

834
// pSql -> tag1_value, ...)
wmmhello's avatar
wmmhello 已提交
835
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
C
Cary Xu 已提交
836
  ASSERT(!pCxt->pTagVals);
C
Cary Xu 已提交
837
  if (!(pCxt->pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal)))) {
838 839 840
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

C
Cary Xu 已提交
841
  SKvParam param = {.pTagVals = pCxt->pTagVals, .pos = 0};
X
Xiaoyu Wang 已提交
842 843 844
  SToken   sToken;
  bool     isParseBindParam = false;
  char     tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
C
Cary Xu 已提交
845
  // TODO: JSON_TAG_REFACTOR => here would have json tag?
846
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
847
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
848 849 850 851 852 853 854 855 856 857 858 859 860

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
    }
X
Xiaoyu Wang 已提交
861

X
Xiaoyu Wang 已提交
862
    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
X
Xiaoyu Wang 已提交
863
    param.schema = pTagSchema;
H
refact  
Hongze Cheng 已提交
864 865
    CHECK_CODE(
        parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
866 867
  }

D
stmt  
dapan1121 已提交
868 869 870 871
  if (isParseBindParam) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
872 873
  // TODO: JSON_TAG_REFACTOR (would be JSON tag or normal tag)
  STag* pTag = NULL;
C
Cary Xu 已提交
874
  if (tTagNew(param.pTagVals, 1, false, &pTag) != 0) {
875
    return buildInvalidOperationMsg(&pCxt->msg, "out of memory");
876 877
  }

C
Cary Xu 已提交
878
  return buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid);
X
Xiaoyu Wang 已提交
879
}
880

X
Xiaoyu Wang 已提交
881 882 883 884 885 886 887 888
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
889

X
Xiaoyu Wang 已提交
890 891
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
                              int32_t len, STableMeta* pMeta) {
892 893
  SVgroupInfo vg;
  CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
X
Xiaoyu Wang 已提交
894 895
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));

D
dapan 已提交
896
  pMeta->uid = 0;
X
Xiaoyu Wang 已提交
897
  pMeta->vgId = vg.vgId;
D
dapan 已提交
898
  pMeta->tableType = TSDB_CHILD_TABLE;
X
Xiaoyu Wang 已提交
899

X
Xiaoyu Wang 已提交
900 901 902 903 904
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
905 906 907
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
D
dapan 已提交
908
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
H
refact  
Hongze Cheng 已提交
909
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
910 911 912 913
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
914

X
Xiaoyu Wang 已提交
915
  SToken sToken;
916 917
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
918 919 920 921 922

  SName sname;
  createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
  char stbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(&sname, stbFName);
X
Xiaoyu Wang 已提交
923

D
dapan 已提交
924
  CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
925 926 927
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
D
dapan 已提交
928
  CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
929 930

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
931
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
932 933 934

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
935
  if (TK_NK_LP == sToken.type) {
936
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
937 938 939 940 941 942 943 944
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
945
  if (TK_NK_LP != sToken.type) {
946 947
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
948
  CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
949 950 951 952
  NEXT_VALID_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_COMMA == sToken.type) {
    return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
  } else if (TK_NK_RP != sToken.type) {
X
Xiaoyu Wang 已提交
953 954
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
955 956 957 958

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
959 960
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow,
                       char* tmpTokenBuf) {
961
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
962 963 964 965
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
966

H
refact  
Hongze Cheng 已提交
967 968
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
969
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
970
  SToken    sToken = {0};
971 972
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
973
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
X
Xiaoyu Wang 已提交
974
    SSchema* pSchema = &schema[spd->boundColumns[i]];
D
stmt  
dapan1121 已提交
975 976 977 978 979 980 981 982 983 984 985 986 987

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
X
Xiaoyu Wang 已提交
988

989
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
990
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
991
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
992 993

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
994
      TSKEY tsKey = TD_ROW_KEY(row);
995
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
996 997 998 999
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
1000
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
1001
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
1002
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
1003 1004 1005
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
1006 1007 1008
        }
      }
    }
D
stmt  
dapan1121 已提交
1009 1010

    *gotRow = true;
C
Cary Xu 已提交
1011 1012
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
C
Cary Xu 已提交
1013
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1014 1015
    taosMemoryFree(pSTSchema);
#endif
1016 1017
  }

C
Cary Xu 已提交
1018
  // *len = pBuilder->extendedRowSize;
1019 1020 1021 1022
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
1023
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
1024
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
1025
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
1026
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
1027 1028

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
1029
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
1030 1031
  SToken sToken;
  while (1) {
1032 1033
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
1034
    if (TK_NK_LP != sToken.type) {
1035 1036
      break;
    }
1037
    pCxt->pSql += index;
1038 1039 1040 1041 1042 1043 1044 1045

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
1046 1047 1048
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
X
Xiaoyu Wang 已提交
1049
      pDataBlock->size += extendedRowSize;  // len;
D
stmt  
dapan1121 已提交
1050
    }
1051

1052 1053 1054 1055
    NEXT_VALID_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_COMMA == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    } else if (TK_NK_RP != sToken.type) {
1056 1057 1058
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
1059 1060 1061
    if (gotRow) {
      (*numOfRows)++;
    }
1062 1063
  }

D
stmt  
dapan1121 已提交
1064
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
X
Xiaoyu Wang 已提交
1065
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
1066 1067 1068 1069
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
1070
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
1071 1072 1073 1074
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
1075
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
1076

H
refact  
Hongze Cheng 已提交
1077
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1078
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1079 1080 1081 1082 1083 1084 1085 1086
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1087
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1088
  taosMemoryFreeClear(pReq->name);
H
Hongze Cheng 已提交
1089
  taosMemoryFreeClear(pReq->ctb.pTag);
X
Xiaoyu Wang 已提交
1090 1091
}

X
Xiaoyu Wang 已提交
1092
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1093
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1094
  destroyBoundColumnInfo(&pCxt->tags);
C
Cary Xu 已提交
1095
  taosArrayDestroy(pCxt->pTagVals);
X
Xiaoyu Wang 已提交
1096
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1097 1098 1099 1100 1101
}

static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1102
  taosHashCleanup(pCxt->pSubTableHashObj);
D
dapan1121 已提交
1103
  taosHashCleanup(pCxt->pTableNameHashObj);
1104 1105

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1106 1107 1108
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1109 1110 1111 1112 1113 1114
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
C
Cary Xu 已提交
1115 1116 1117
  int32_t tbNum = 0;
  char    tbFName[TSDB_TABLE_FNAME_LEN];
  bool    autoCreateTbl = false;
X
Xiaoyu Wang 已提交
1118

X
Xiaoyu Wang 已提交
1119
  // for each table
1120 1121
  while (1) {
    SToken sToken;
X
Xiaoyu Wang 已提交
1122
    char*  tbName = NULL;
D
stmt  
dapan1121 已提交
1123

1124 1125 1126 1127 1128
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
dapan1121 已提交
1129 1130 1131
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }
X
Xiaoyu Wang 已提交
1132

D
stmt  
dapan1121 已提交
1133
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1134
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1135 1136 1137 1138
      }
      break;
    }

D
stmt  
dapan1121 已提交
1139
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
X
Xiaoyu Wang 已提交
1140
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
D
stmt  
dapan1121 已提交
1141 1142
    }

D
stmt  
dapan1121 已提交
1143 1144
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1145 1146 1147
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
X
Xiaoyu Wang 已提交
1148

D
stmt  
dapan1121 已提交
1149 1150 1151 1152 1153 1154
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
X
Xiaoyu Wang 已提交
1155

1156 1157 1158
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

D
dapan 已提交
1159
    SName name;
1160
    CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
D
dapan 已提交
1161

1162
    tNameExtractFullName(&name, tbFName);
X
Xiaoyu Wang 已提交
1163 1164
    CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));

1165
    // USING clause
1166
    if (TK_USING == sToken.type) {
D
dapan 已提交
1167
      CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
1168
      NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1169
      autoCreateTbl = true;
1170
    } else {
D
dapan1121 已提交
1171 1172 1173
      char dbFName[TSDB_DB_FNAME_LEN];
      tNameGetFullDbName(&name, dbFName);
      CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
1174 1175
    }

H
refact  
Hongze Cheng 已提交
1176
    STableDataBlocks* dataBuf = NULL;
D
dapan 已提交
1177
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1178 1179
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));
1180

1181
    if (TK_NK_LP == sToken.type) {
1182
      // pSql -> field1_name, ...)
D
dapan1121 已提交
1183
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
1184 1185 1186 1187 1188 1189
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1190
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1191 1192

      tbNum++;
1193 1194 1195 1196
      continue;
    }

    // FILE csv_file_path
X
Xiaoyu Wang 已提交
1197
    if (TK_FILE == sToken.type) {
1198 1199
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1200
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1201 1202 1203
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1204
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1205 1206

      tbNum++;
1207 1208 1209 1210 1211
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1212

D
stmt  
dapan1121 已提交
1213
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
X
Xiaoyu Wang 已提交
1214
    SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1215 1216 1217 1218
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
C
Cary Xu 已提交
1219 1220
    (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
                                pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1221

D
dapan 已提交
1222
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1223 1224
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
D
dapan1121 已提交
1225
    pCxt->pTableMeta = NULL;
X
Xiaoyu Wang 已提交
1226

D
stmt  
dapan1121 已提交
1227 1228
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1229

1230
  // merge according to vgId
D
stmt  
dapan1121 已提交
1231
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1232
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1233
  }
1234
  return buildOutput(pCxt);
1235 1236 1237 1238 1239 1240 1241 1242
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
X
Xiaoyu Wang 已提交
1243
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
1244
  SInsertParseContext context = {
X
Xiaoyu Wang 已提交
1245 1246 1247 1248
      .pComCxt = pContext,
      .pSql = (char*)pContext->pSql,
      .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
      .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1249 1250
      .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
      .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
X
Xiaoyu Wang 已提交
1251
      .totalNum = 0,
C
Cary Xu 已提交
1252
      .pTagVals = NULL,
X
Xiaoyu Wang 已提交
1253 1254
      .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
      .pStmtCb = pContext->pStmtCb};
1255

D
stmt  
dapan1121 已提交
1256
  if (pContext->pStmtCb && *pQuery) {
X
Xiaoyu Wang 已提交
1257 1258
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
                                        &context.pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1259
  } else {
X
Xiaoyu Wang 已提交
1260 1261 1262
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
    context.pTableBlockHashObj =
        taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
D
stmt  
dapan1121 已提交
1263
  }
X
Xiaoyu Wang 已提交
1264 1265

  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
X
Xiaoyu Wang 已提交
1266
      NULL == context.pTableNameHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1267
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1268 1269
  }

D
stmt  
dapan1121 已提交
1270 1271 1272 1273
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1274
  if (NULL == *pQuery) {
D
stmt  
dapan1121 已提交
1275 1276 1277 1278
    *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1279
  }
1280 1281 1282 1283
  (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
  (*pQuery)->haveResultSet = false;
  (*pQuery)->msgType = TDMT_VND_SUBMIT;
  (*pQuery)->pRoot = (SNode*)context.pOutput;
X
Xiaoyu Wang 已提交
1284

D
dapan1121 已提交
1285 1286 1287 1288 1289 1290
  if (NULL == (*pQuery)->pTableList) {
    (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
    if (NULL == (*pQuery)->pTableList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
1291

1292
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1293

1294
  int32_t code = skipInsertInto(&context.pSql, &context.msg);
1295 1296 1297
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
1298
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
X
Xiaoyu Wang 已提交
1299 1300 1301 1302 1303 1304
    SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
    while (NULL != pTable) {
      taosArrayPush((*pQuery)->pTableList, pTable);
      pTable = taosHashIterate(context.pTableNameHashObj, pTable);
    }
  }
1305
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1306
  return code;
1307
}
D
stmt  
dapan1121 已提交
1308

1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
typedef struct SInsertParseSyntaxCxt {
  SParseContext*   pComCxt;
  char*            pSql;
  SMsgBuf          msg;
  SParseMetaCache* pMetaCache;
} SInsertParseSyntaxCxt;

static int32_t skipParentheses(SInsertParseSyntaxCxt* pCxt) {
  SToken sToken;
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_RP == sToken.type) {
      break;
    }
    if (0 == sToken.n) {
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t skipBoundColumns(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t skipValuesClause(SInsertParseSyntaxCxt* pCxt) {
  int32_t numOfRows = 0;
  SToken  sToken;
  while (1) {
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
    if (TK_NK_LP != sToken.type) {
      break;
    }
    pCxt->pSql += index;

    CHECK_CODE(skipParentheses(pCxt));
    ++numOfRows;
  }
  if (0 == numOfRows) {
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t skipTagsClause(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }

// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_LP == sToken.type) {
    CHECK_CODE(skipBoundColumns(pCxt));
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_LP != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
  CHECK_CODE(skipTagsClause(pCxt));

  return TSDB_CODE_SUCCESS;
}

static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
  SName name;
  CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
  CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
  CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
  CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
  return TSDB_CODE_SUCCESS;
}

static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
  bool hasData = false;
  // for each table
  while (1) {
    SToken sToken;

    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }

      if (!hasData) {
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
      }
      break;
    }

    hasData = false;

    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

    // USING clause
    if (TK_USING == sToken.type) {
      NEXT_TOKEN(pCxt->pSql, sToken);
      CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
      CHECK_CODE(skipUsingClause(pCxt));
      NEXT_TOKEN(pCxt->pSql, sToken);
    } else {
      CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
    }

    if (TK_NK_LP == sToken.type) {
      // pSql -> field1_name, ...)
      CHECK_CODE(skipBoundColumns(pCxt));
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(skipValuesClause(pCxt));
      hasData = true;
      continue;
    }

    // FILE csv_file_path
    if (TK_FILE == sToken.type) {
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      hasData = true;
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery) {
  SInsertParseSyntaxCxt context = {.pComCxt = pContext,
                                   .pSql = (char*)pContext->pSql,
                                   .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
                                   .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))};
  if (NULL == context.pMetaCache) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t code = skipInsertInto(&context.pSql, &context.msg);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBodySyntax(&context);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    TSWAP((*pQuery)->pMetaCache, context.pMetaCache);
  }
  return code;
}

X
Xiaoyu Wang 已提交
1474 1475 1476 1477
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
D
stmt  
dapan1121 已提交
1478
  int32_t code = 0;
X
Xiaoyu Wang 已提交
1479 1480
  char*   tbName = NULL;

D
stmt  
dapan1121 已提交
1481
  NEXT_TOKEN(pTableName, sToken);
X
Xiaoyu Wang 已提交
1482

D
stmt  
dapan1121 已提交
1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1494
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1495 1496 1497 1498 1499 1500 1501
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
1502 1503
  SVnodeModifOpStmt*  modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
  int32_t             code = 0;
D
stmt  
dapan1121 已提交
1504
  SInsertParseContext insertCtx = {
X
Xiaoyu Wang 已提交
1505 1506 1507
      .pVgroupsHashObj = pVgHash,
      .pTableBlockHashObj = pBlockHash,
      .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1508
  };
X
Xiaoyu Wang 已提交
1509

D
stmt  
dapan1121 已提交
1510 1511
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1512
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1513 1514 1515 1516
  }

  CHECK_CODE(buildOutput(&insertCtx));

wmmhello's avatar
wmmhello 已提交
1517
  destroyBlockArrayList(insertCtx.pVgDataBlocks);
D
stmt  
dapan1121 已提交
1518 1519 1520
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1521 1522 1523 1524
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
                           char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
stmt  
dapan1121 已提交
1525 1526 1527 1528 1529
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

C
Cary Xu 已提交
1530 1531
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
C
Cary Xu 已提交
1532
    return buildInvalidOperationMsg(&pBuf, "out of memory");
D
stmt  
dapan1121 已提交
1533 1534
  }

D
dapan1121 已提交
1535
  SSchema* pSchema = pDataBlock->pTableMeta->schema;
C
Cary Xu 已提交
1536
  SKvParam param = {.pTagVals = pTagArray, .pos = 0};
D
stmt  
dapan1121 已提交
1537 1538 1539 1540 1541 1542

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      KvRowAppend(&pBuf, NULL, 0, &param);
      continue;
    }
X
Xiaoyu Wang 已提交
1543

X
Xiaoyu Wang 已提交
1544
    SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
D
stmt  
dapan1121 已提交
1545 1546 1547 1548 1549 1550
    param.schema = pTagSchema;

    int32_t colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
X
Xiaoyu Wang 已提交
1551 1552

    CHECK_CODE(KvRowAppend(&pBuf, (char*)bind[c].buffer, colLen, &param));
D
stmt  
dapan1121 已提交
1553 1554
  }

C
Cary Xu 已提交
1555 1556 1557
  STag* pTag = NULL;

  // TODO: JSON_TAG_REFACTOR (if is json or not)?
C
Cary Xu 已提交
1558
  if (0 != tTagNew(pTagArray, 1, false, &pTag)) {
1559
    return buildInvalidOperationMsg(&pBuf, "out of memory");
D
stmt  
dapan1121 已提交
1560 1561 1562
  }

  SVCreateTbReq tbReq = {0};
C
Cary Xu 已提交
1563
  CHECK_CODE(buildCreateTbReq(&tbReq, tName, pTag, suid));
D
stmt  
dapan1121 已提交
1564 1565 1566
  CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));

  destroyCreateSubTbReq(&tbReq);
C
Cary Xu 已提交
1567
  taosArrayDestroy(pTagArray);
D
stmt  
dapan1121 已提交
1568 1569 1570 1571

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1572 1573 1574 1575
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1576 1577
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1578 1579 1580 1581
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
stmt  
dapan1121 已提交
1582 1583
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1584
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
X
Xiaoyu Wang 已提交
1585

D
stmt  
dapan1121 已提交
1586 1587 1588
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
X
Xiaoyu Wang 已提交
1589

D
stmt  
dapan1121 已提交
1590
    for (int c = 0; c < spd->numOfBound; ++c) {
X
Xiaoyu Wang 已提交
1591
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
D
stmt  
dapan1121 已提交
1592 1593 1594 1595

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
X
Xiaoyu Wang 已提交
1596

D
stmt  
dapan1121 已提交
1597 1598 1599 1600
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1601 1602 1603
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
X
Xiaoyu Wang 已提交
1604

D
stmt  
dapan1121 已提交
1605 1606
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
D
dapan1121 已提交
1607 1608 1609 1610
        if (bind[c].buffer_type != pColSchema->type) {
          return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
        }

D
stmt  
dapan1121 已提交
1611 1612 1613 1614
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
X
Xiaoyu Wang 已提交
1615 1616

        CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1617
      }
X
Xiaoyu Wang 已提交
1618

D
stmt  
dapan1121 已提交
1619 1620
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1621
        checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
      }
    }
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
C
Cary Xu 已提交
1633 1634
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1635
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1636 1637
    taosMemoryFree(pSTSchema);
#endif
D
stmt  
dapan1121 已提交
1638 1639
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
1640

X
Xiaoyu Wang 已提交
1641
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1642 1643 1644 1645 1646 1647 1648
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1649 1650 1651 1652 1653
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1654 1655
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1656 1657 1658 1659
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  bool                rowStart = (0 == colIdx);
  bool                rowEnd = ((colIdx + 1) == spd->numOfBound);
D
stmt  
dapan1121 已提交
1660 1661 1662 1663 1664

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
X
Xiaoyu Wang 已提交
1665

D
stmt  
dapan1121 已提交
1666 1667 1668 1669 1670 1671 1672
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
D
stmt  
dapan1121 已提交
1673

X
Xiaoyu Wang 已提交
1674
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]];
D
stmt  
dapan1121 已提交
1675 1676 1677 1678

    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
X
Xiaoyu Wang 已提交
1679

D
stmt  
dapan1121 已提交
1680 1681 1682 1683 1684 1685 1686
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
X
Xiaoyu Wang 已提交
1687

D
stmt  
dapan1121 已提交
1688 1689
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
1690 1691 1692 1693
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
1694 1695 1696 1697
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
X
Xiaoyu Wang 已提交
1698 1699

      CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1700
    }
X
Xiaoyu Wang 已提交
1701

D
stmt  
dapan1121 已提交
1702 1703
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1704
      checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1705
    }
X
Xiaoyu Wang 已提交
1706

D
stmt  
dapan1121 已提交
1707 1708 1709 1710 1711 1712 1713 1714
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
X
Xiaoyu Wang 已提交
1715
    }
C
Cary Xu 已提交
1716 1717

#ifdef TD_DEBUG_PRINT_ROW
X
Xiaoyu Wang 已提交
1718
    if (rowEnd) {
C
Cary Xu 已提交
1719
      STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1720
      tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1721 1722 1723
      taosMemoryFree(pSTSchema);
    }
#endif
D
stmt  
dapan1121 已提交
1724 1725
  }

D
stmt  
dapan1121 已提交
1726 1727 1728
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

X
Xiaoyu Wang 已提交
1729
    SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1730 1731 1732 1733 1734 1735 1736 1737
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1738
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
1739 1740 1741 1742 1743 1744 1745
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1746
      SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i]];
D
stmt  
dapan1121 已提交
1747 1748 1749 1750
      strcpy((*fields)[i].name, pTagSchema->name);
      (*fields)[i].type = pTagSchema->type;
      (*fields)[i].bytes = pTagSchema->bytes;
    }
D
stmt  
dapan1121 已提交
1751 1752 1753 1754 1755 1756 1757
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1758 1759
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
1760 1761 1762 1763
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
X
Xiaoyu Wang 已提交
1764 1765

  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1766 1767 1768 1769 1770 1771 1772 1773
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1774

D
stmt  
dapan1121 已提交
1775 1776 1777
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1778 1779 1780
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*          pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1781 1782
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
1783 1784 1785
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
1786 1787 1788 1789 1790

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1791

D
stmt  
dapan1121 已提交
1792 1793 1794
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1795
// schemaless logic start
D
stmt  
dapan1121 已提交
1796

wmmhello's avatar
wmmhello 已提交
1797
typedef struct SmlExecTableHandle {
X
Xiaoyu Wang 已提交
1798 1799
  SParsedDataColInfo tags;          // each table
  SVCreateTbReq      createTblReq;  // each table
wmmhello's avatar
wmmhello 已提交
1800
} SmlExecTableHandle;
wmmhello's avatar
wmmhello 已提交
1801

wmmhello's avatar
wmmhello 已提交
1802
typedef struct SmlExecHandle {
1803 1804 1805
  SHashObj*          pBlockHash;
  SmlExecTableHandle tableExecHandle;
  SQuery*            pQuery;
wmmhello's avatar
wmmhello 已提交
1806
} SSmlExecHandle;
wmmhello's avatar
wmmhello 已提交
1807

wmmhello's avatar
wmmhello 已提交
1808 1809 1810 1811 1812 1813
static void smlDestroyTableHandle(void* pHandle) {
  SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
  destroyBoundColumnInfo(&handle->tags);
  destroyCreateSubTbReq(&handle->createTblReq);
}

X
Xiaoyu Wang 已提交
1814
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
wmmhello's avatar
wmmhello 已提交
1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826
  col_id_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0;
  pColList->boundNullLen = 0;
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
  for (col_id_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  bool     isOrdered = true;
  col_id_t lastColIdx = -1;  // last column found
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
X
Xiaoyu Wang 已提交
1827 1828
    SSmlKv*  kv = taosArrayGetP(cols, i);
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
wmmhello's avatar
wmmhello 已提交
1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
X
Xiaoyu Wang 已提交
1843
    pColList->boundColumns[pColList->numOfBound] = index;
wmmhello's avatar
wmmhello 已提交
1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876
    ++pColList->numOfBound;
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
1877
  if (pColList->numOfCols > pColList->numOfBound) {
wmmhello's avatar
wmmhello 已提交
1878 1879 1880 1881 1882 1883 1884
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895
/**
 * @brief No json tag for schemaless
 *
 * @param cols
 * @param tags
 * @param pSchema
 * @param ppTag
 * @param msg
 * @return int32_t
 */
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SMsgBuf* msg) {
C
Cary Xu 已提交
1896 1897
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
wmmhello's avatar
wmmhello 已提交
1898 1899 1900
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

C
Cary Xu 已提交
1901
  SKvParam param = {.pTagVals = pTagArray, .pos = 0};
wmmhello's avatar
wmmhello 已提交
1902
  for (int i = 0; i < tags->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1903
    SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
wmmhello's avatar
wmmhello 已提交
1904
    param.schema = pTagSchema;
X
Xiaoyu Wang 已提交
1905
    SSmlKv* kv = taosArrayGetP(cols, i);
1906
    if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
1907
      KvRowAppend(msg, kv->value, kv->length, &param);
1908
    } else {
wmmhello's avatar
wmmhello 已提交
1909 1910
      KvRowAppend(msg, &(kv->value), kv->length, &param);
    }
wmmhello's avatar
wmmhello 已提交
1911 1912
  }

C
Cary Xu 已提交
1913 1914
  if (tTagNew(pTagArray, 1, false, ppTag) != 0) {
    taosArrayDestroy(pTagArray);
1915
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1916
  }
C
Cary Xu 已提交
1917

C
Cary Xu 已提交
1918
  taosArrayDestroy(pTagArray);
wmmhello's avatar
wmmhello 已提交
1919 1920 1921
  return TSDB_CODE_SUCCESS;
}

1922 1923
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
                    char* tableName, char* msgBuf, int16_t msgBufLen) {
wmmhello's avatar
wmmhello 已提交
1924 1925
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

X
Xiaoyu Wang 已提交
1926
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
1927 1928
  smlDestroyTableHandle(&smlHandle->tableExecHandle);  // free for each table
  SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
wmmhello's avatar
wmmhello 已提交
1929 1930
  setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
  int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
X
Xiaoyu Wang 已提交
1931
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1932 1933 1934
    buildInvalidOperationMsg(&pBuf, "bound tags error");
    return ret;
  }
C
Cary Xu 已提交
1935 1936
  STag* pTag = NULL;
  ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &pBuf);
X
Xiaoyu Wang 已提交
1937
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1938 1939
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1940

C
Cary Xu 已提交
1941
  buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid);
wmmhello's avatar
wmmhello 已提交
1942

wmmhello's avatar
wmmhello 已提交
1943
  STableDataBlocks* pDataBlock = NULL;
X
Xiaoyu Wang 已提交
1944 1945
  ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
                             TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
wmmhello's avatar
wmmhello 已提交
1946
                             pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
X
Xiaoyu Wang 已提交
1947
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1948 1949 1950
    buildInvalidOperationMsg(&pBuf, "create data block error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1951 1952 1953

  SSchema* pSchema = getTableColumnSchema(pTableMeta);

1954
  ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
X
Xiaoyu Wang 已提交
1955
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1956 1957 1958
    buildInvalidOperationMsg(&pBuf, "bound cols error");
    return ret;
  }
X
Xiaoyu Wang 已提交
1959
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
wmmhello's avatar
wmmhello 已提交
1960 1961
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1962
  SMemParam           param = {.rb = pBuilder};
wmmhello's avatar
wmmhello 已提交
1963 1964 1965

  initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);

1966
  int32_t rowNum = taosArrayGetSize(cols);
1967
  if (rowNum <= 0) {
wmmhello's avatar
wmmhello 已提交
1968 1969
    return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
  }
wmmhello's avatar
wmmhello 已提交
1970
  ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
X
Xiaoyu Wang 已提交
1971
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1972 1973 1974
    buildInvalidOperationMsg(&pBuf, "allocate memory error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1975 1976 1977
  for (int32_t r = 0; r < rowNum; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
1978
    void*  rowData = taosArrayGetP(cols, r);
1979
    size_t rowDataSize = 0;
1980
    if (format) {
1981
      rowDataSize = taosArrayGetSize(rowData);
wmmhello's avatar
wmmhello 已提交
1982
    }
wmmhello's avatar
wmmhello 已提交
1983 1984

    // 1. set the parsed value from sql string
1985
    for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
1986
      SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
wmmhello's avatar
wmmhello 已提交
1987 1988 1989 1990

      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

X
Xiaoyu Wang 已提交
1991 1992 1993
      SSmlKv* kv = NULL;
      if (format) {
        if (j < rowDataSize) {
1994
          kv = taosArrayGetP(rowData, j);
X
Xiaoyu Wang 已提交
1995 1996
          if (rowDataSize != spd->numOfBound &&
              (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
1997
            kv = NULL;
X
Xiaoyu Wang 已提交
1998
          } else {
1999
            j++;
2000
          }
wmmhello's avatar
wmmhello 已提交
2001
        }
X
Xiaoyu Wang 已提交
2002 2003 2004
      } else {
        void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
        if (p) kv = *p;
wmmhello's avatar
wmmhello 已提交
2005
      }
wmmhello's avatar
wmmhello 已提交
2006

2007
      if (!kv || kv->length == 0) {
wmmhello's avatar
wmmhello 已提交
2008 2009
        MemRowAppend(&pBuf, NULL, 0, &param);
      } else {
wmmhello's avatar
wmmhello 已提交
2010 2011
        int32_t colLen = kv->length;
        if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
2012
          kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
wmmhello's avatar
wmmhello 已提交
2013 2014
        }

2015
        if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
2016
          MemRowAppend(&pBuf, kv->value, colLen, &param);
2017
        } else {
wmmhello's avatar
wmmhello 已提交
2018 2019
          MemRowAppend(&pBuf, &(kv->value), colLen, &param);
        }
wmmhello's avatar
wmmhello 已提交
2020 2021 2022 2023
      }

      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
2024
        checkTimestamp(pDataBlock, (const char*)&tsKey);
wmmhello's avatar
wmmhello 已提交
2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040
      }
    }

    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }

    pDataBlock->size += extendedRowSize;
  }

X
Xiaoyu Wang 已提交
2041
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
wmmhello's avatar
wmmhello 已提交
2042 2043 2044 2045 2046 2047 2048
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2049 2050 2051
void* smlInitHandle(SQuery* pQuery) {
  SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
  if (!handle) return NULL;
wmmhello's avatar
wmmhello 已提交
2052 2053 2054 2055 2056 2057
  handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
  handle->pQuery = pQuery;

  return handle;
}

X
Xiaoyu Wang 已提交
2058 2059 2060
void smlDestroyHandle(void* pHandle) {
  if (!pHandle) return;
  SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
wmmhello's avatar
wmmhello 已提交
2061
  destroyBlockHashmap(handle->pBlockHash);
wmmhello's avatar
wmmhello 已提交
2062
  smlDestroyTableHandle(&handle->tableExecHandle);
wmmhello's avatar
wmmhello 已提交
2063 2064 2065 2066
  taosMemoryFree(handle);
}

int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
2067
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
wmmhello's avatar
wmmhello 已提交
2068 2069
  return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
wmmhello's avatar
wmmhello 已提交
2070
// schemaless logic end