parInsert.c 64.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "os.h"
X
Xiaoyu Wang 已提交
17 18 19
#include "parInsertData.h"
#include "parInt.h"
#include "parToken.h"
H
refact  
Hongze Cheng 已提交
20
#include "parUtil.h"
21 22 23 24
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

H
refact  
Hongze Cheng 已提交
25 26 27
#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
28
    sToken = tStrGetToken(pSql, &index, false); \
H
refact  
Hongze Cheng 已提交
29
    pSql += index;                              \
30 31
  } while (0)

H
refact  
Hongze Cheng 已提交
32 33 34
#define NEXT_TOKEN_WITH_PREV(pSql, sToken)     \
  do {                                         \
    int32_t index = 0;                         \
X
Xiaoyu Wang 已提交
35
    sToken = tStrGetToken(pSql, &index, true); \
H
refact  
Hongze Cheng 已提交
36
    pSql += index;                             \
X
Xiaoyu Wang 已提交
37 38
  } while (0)

39
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
H
refact  
Hongze Cheng 已提交
40 41
  do {                                           \
    sToken = tStrGetToken(pSql, &index, false);  \
42 43
  } while (0)

44 45 46 47 48 49 50
#define NEXT_VALID_TOKEN(pSql, sToken)        \
  do {                                        \
    sToken.n = tGetToken(pSql, &sToken.type); \
    sToken.z = pSql;                          \
    pSql += sToken.n;                         \
  } while (TK_NK_SPACE == sToken.type)

51
typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
52 53 54 55 56 57 58 59 60 61 62
  SParseContext*     pComCxt;             // input
  char*              pSql;                // input
  SMsgBuf            msg;                 // input
  STableMeta*        pTableMeta;          // each table
  SParsedDataColInfo tags;                // each table
  SKVRowBuilder      tagsBuilder;         // each table
  SVCreateTbReq      createTblReq;        // each table
  SHashObj*          pVgroupsHashObj;     // global
  SHashObj*          pTableBlockHashObj;  // global
  SHashObj*          pSubTableHashObj;    // global
  SArray*            pVgDataBlocks;       // global
X
Xiaoyu Wang 已提交
63
  SHashObj*          pTableNameHashObj;   // global
X
Xiaoyu Wang 已提交
64
  int32_t            totalNum;
X
Xiaoyu Wang 已提交
65
  SVnodeModifOpStmt* pOutput;
X
Xiaoyu Wang 已提交
66
  SStmtCallback*     pStmtCb;
67 68
} SInsertParseContext;

H
refact  
Hongze Cheng 已提交
69
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
X
Xiaoyu Wang 已提交
70 71 72 73

static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;

D
stmt  
dapan1121 已提交
74
typedef struct SKvParam {
X
Xiaoyu Wang 已提交
75 76
  SKVRowBuilder* builder;
  SSchema*       schema;
D
stmt  
dapan1121 已提交
77 78 79 80 81 82 83 84 85 86
  char           buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

typedef struct SMemParam {
  SRowBuilder* rb;
  SSchema*     schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

X
Xiaoyu Wang 已提交
87 88 89
#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
D
stmt  
dapan1121 已提交
90
    if (TSDB_CODE_SUCCESS != code) { \
X
Xiaoyu Wang 已提交
91 92
      return code;                   \
    }                                \
D
stmt  
dapan1121 已提交
93 94
  } while (0)

95 96 97 98 99 100 101 102 103 104 105 106 107
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INSERT != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
  }
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INTO != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
  }
  return TSDB_CODE_SUCCESS;
}

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
static int32_t parserValidateIdToken(SToken* pToken) {
  if (pToken == NULL || pToken->z == NULL || pToken->type != TK_NK_ID) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  // it is a token quoted with escape char '`'
  if (pToken->z[0] == TS_ESCAPE_CHAR && pToken->z[pToken->n - 1] == TS_ESCAPE_CHAR) {
    return TSDB_CODE_SUCCESS;
  }

  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
  if (sep == NULL) {  // It is a single part token, not a complex type
    if (isNumber(pToken)) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    strntolower(pToken->z, pToken->z, pToken->n);
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

    if (pToken->type == TK_NK_SPACE) {
      pToken->n = (uint32_t)strtrim(pToken->z);
    }

    pToken->n = tGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = (uint32_t)(oldLen - (sep - pStr) - 1);
    int32_t len = tGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || pToken->type != TK_NK_ID) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
      // first part do not have quote do nothing
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
      memset(pToken->z + pToken->n - offset, ' ', offset);
    }

    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;

    strntolower(pToken->z, pToken->z, pToken->n);
  }

  return TSDB_CODE_SUCCESS;
}

170
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
171
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
172
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
173 174
  }

175
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
H
refact  
Hongze Cheng 已提交
176
  if (NULL != p) {  // db.table
H
Haojun Liao 已提交
177
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
178
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
179 180
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
181
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
182 183
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
184

185 186 187
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
188
static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
189 190 191 192
  const char* msg1 = "name too long";
  const char* msg2 = "invalid database name";
  const char* msg3 = "db is not specified";

H
refact  
Hongze Cheng 已提交
193 194
  int32_t code = TSDB_CODE_SUCCESS;
  char*   p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
X
Xiaoyu Wang 已提交
195

H
refact  
Hongze Cheng 已提交
196
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
X
Xiaoyu Wang 已提交
197 198 199
    assert(*p == TS_PATH_DELIMITER[0]);

    int32_t dbLen = p - pTableName->z;
H
refact  
Hongze Cheng 已提交
200
    char    name[TSDB_DB_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
201 202 203
    strncpy(name, pTableName->z, dbLen);
    dbLen = strdequote(name);

D
stmt  
dapan1121 已提交
204
    code = tNameSetDbName(pName, acctId, name, dbLen);
X
Xiaoyu Wang 已提交
205 206 207 208 209
    if (code != TSDB_CODE_SUCCESS) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    int32_t tbLen = pTableName->n - dbLen - 1;
H
refact  
Hongze Cheng 已提交
210
    char    tbname[TSDB_TABLE_FNAME_LEN] = {0};
X
Xiaoyu Wang 已提交
211
    strncpy(tbname, p + 1, tbLen);
H
refact  
Hongze Cheng 已提交
212
    /*tbLen = */ strdequote(tbname);
X
Xiaoyu Wang 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228

    code = tNameFromString(pName, tbname, T_NAME_TABLE);
    if (code != 0) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  } else {  // get current DB name first, and then set it into path
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
      return buildInvalidOperationMsg(pMsgBuf, msg1);
    }

    assert(pTableName->n < TSDB_TABLE_FNAME_LEN);

    char name[TSDB_TABLE_FNAME_LEN] = {0};
    strncpy(name, pTableName->z, pTableName->n);
    strdequote(name);

D
stmt  
dapan1121 已提交
229
    if (dbName == NULL) {
X
Xiaoyu Wang 已提交
230 231 232
      return buildInvalidOperationMsg(pMsgBuf, msg3);
    }

D
stmt  
dapan1121 已提交
233
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
X
Xiaoyu Wang 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247
    if (code != TSDB_CODE_SUCCESS) {
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
      return code;
    }

    code = tNameFromString(pName, name, T_NAME_TABLE);
    if (code != 0) {
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
    }
  }

  return code;
}

X
Xiaoyu Wang 已提交
248
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
H
Haojun Liao 已提交
249
  SParseContext* pBasicCtx = pCxt->pComCxt;
D
dapan 已提交
250 251

  bool pass = false;
X
Xiaoyu Wang 已提交
252 253
  CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser,
                            dbFname, AUTH_TYPE_WRITE, &pass));
D
dapan 已提交
254 255 256
  if (!pass) {
    return TSDB_CODE_PAR_PERMISSION_DENIED;
  }
D
stmt  
dapan1121 已提交
257
  if (isStb) {
D
dapan 已提交
258
    CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
H
refact  
Hongze Cheng 已提交
259
                                    &pCxt->pTableMeta));
D
stmt  
dapan1121 已提交
260
  } else {
D
dapan 已提交
261
    CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
H
refact  
Hongze Cheng 已提交
262
                                   &pCxt->pTableMeta));
X
Xiaoyu Wang 已提交
263
    ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0);
X
Xiaoyu Wang 已提交
264 265
    SVgroupInfo vg;
    CHECK_CODE(
D
dapan 已提交
266
        catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
X
Xiaoyu Wang 已提交
267
    CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
D
stmt  
dapan1121 已提交
268
  }
H
refact  
Hongze Cheng 已提交
269
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
270 271
}

X
Xiaoyu Wang 已提交
272 273 274
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, false);
}
D
stmt  
dapan1121 已提交
275

X
Xiaoyu Wang 已提交
276 277 278
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
  return getTableMetaImpl(pCxt, name, dbFname, true);
}
D
stmt  
dapan1121 已提交
279

280 281 282 283 284 285 286 287 288 289
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

D
dapan1121 已提交
290
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
H
refact  
Hongze Cheng 已提交
291 292 293 294 295 296 297 298 299
  SSubmitReq* submit = (SSubmitReq*)blocks->pData;
  submit->header.vgId = htonl(blocks->vg.vgId);
  submit->header.contLen = htonl(blocks->size);
  submit->length = submit->header.contLen;
  submit->numOfBlocks = htonl(blocks->numOfTables);
  SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
  int32_t     numOfBlocks = blocks->numOfTables;
  while (numOfBlocks--) {
    int32_t dataLen = blk->dataLen;
300
    int32_t schemaLen = blk->schemaLen;
H
refact  
Hongze Cheng 已提交
301 302 303 304 305 306 307
    blk->uid = htobe64(blk->uid);
    blk->suid = htobe64(blk->suid);
    blk->padding = htonl(blk->padding);
    blk->sversion = htonl(blk->sversion);
    blk->dataLen = htonl(blk->dataLen);
    blk->schemaLen = htonl(blk->schemaLen);
    blk->numOfRows = htons(blk->numOfRows);
308
    blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
H
refact  
Hongze Cheng 已提交
309
  }
310 311 312 313 314 315 316 317 318 319
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
H
refact  
Hongze Cheng 已提交
320
    SVgDataBlocks*    dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
321 322 323
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
324
    taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
325 326
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
wafwerar's avatar
wafwerar 已提交
327
    TSWAP(dst->pData, src->pData);
D
dapan1121 已提交
328
    buildMsgHeader(src, dst);
329 330 331 332 333
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
334
int32_t checkTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
335 336 337 338 339
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

H
refact  
Hongze Cheng 已提交
340
  TSKEY k = *(TSKEY*)start;
341
  if (k <= pDataBlocks->prevTS) {
342 343 344 345 346 347 348
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
349 350 351 352 353 354
static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
  int32_t index = 0;
  SToken  sToken;
  int64_t interval;
  int64_t ts = 0;
  char*   pTokenEnd = *end;
355 356

  if (pToken->type == TK_NOW) {
357
    ts = taosGetTimestamp(timePrec);
358 359
  } else if (pToken->type == TK_TODAY) {
    ts = taosGetTimestampToday(timePrec);
360
  } else if (pToken->type == TK_NK_INTEGER) {
X
Xiaoyu Wang 已提交
361
    toInteger(pToken->z, pToken->n, 10, &ts);
H
refact  
Hongze Cheng 已提交
362
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
363
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
364
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
365 366 367 368 369 370 371
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
H
refact  
Hongze Cheng 已提交
372
    if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') {  // for insert NOW()/TODAY()
373 374 375 376
      *end = pTokenEnd = &pToken->z[k + 2];
      k++;
      continue;
    }
377
    if (pToken->z[k] == ',') {
378 379
      *end = pTokenEnd;
      *time = ts;
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

X
Xiaoyu Wang 已提交
395
  if (sToken.type == TK_NK_MINUS || sToken.type == TK_NK_PLUS) {
396 397 398 399 400
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
401
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
402 403 404 405 406 407 408
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

409
    if (sToken.type == TK_NK_PLUS) {
410
      ts += interval;
411
    } else {
412
      ts = ts - interval;
413 414
    }

415
    *end = pTokenEnd;
416 417
  }

418
  *time = ts;
419 420
  return TSDB_CODE_SUCCESS;
}
421

X
Xiaoyu Wang 已提交
422
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
H
refact  
Hongze Cheng 已提交
423 424 425 426
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
       pToken->type != TK_NK_BIN) ||
427
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
X
Xiaoyu Wang 已提交
428 429 430 431
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
  }

  // Remove quotation marks
X
Xiaoyu Wang 已提交
432
  if (TK_NK_STRING == pToken->type) {
X
Xiaoyu Wang 已提交
433 434 435 436
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
    }

437
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
X
Xiaoyu Wang 已提交
438
    pToken->z = tmpTokenBuf;
439
    pToken->n = len;
X
Xiaoyu Wang 已提交
440 441 442 443 444
  }

  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
445
static bool isNullStr(SToken* pToken) {
446
  return (pToken->type == TK_NULL) || ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
X
Xiaoyu Wang 已提交
447 448 449
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}

H
refact  
Hongze Cheng 已提交
450
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
X
Xiaoyu Wang 已提交
451
  errno = 0;
wafwerar's avatar
wafwerar 已提交
452
  *value = taosStr2Double(pToken->z, endPtr);
X
Xiaoyu Wang 已提交
453 454 455

  // not a valid integer number, return error
  if ((*endPtr - pToken->z) != pToken->n) {
456
    return TK_NK_ILLEGAL;
X
Xiaoyu Wang 已提交
457 458 459 460 461
  }

  return pToken->type;
}

H
refact  
Hongze Cheng 已提交
462 463
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
                               _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
X
Xiaoyu Wang 已提交
464 465 466
  int64_t  iv;
  uint64_t uv;
  char*    endptr = NULL;
X
Xiaoyu Wang 已提交
467 468 469 470 471 472 473 474

  int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  if (isNullStr(pToken)) {
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
D
stmt  
dapan1121 已提交
475
      return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
X
Xiaoyu Wang 已提交
476 477
    }

X
Xiaoyu Wang 已提交
478
    return func(pMsgBuf, NULL, 0, param);
X
Xiaoyu Wang 已提交
479 480 481 482
  }

  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {
483
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
X
Xiaoyu Wang 已提交
484
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
485
          return func(pMsgBuf, &TRUE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
486
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
X
Xiaoyu Wang 已提交
487
          return func(pMsgBuf, &FALSE_VALUE, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
488 489 490
        } else {
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
        }
491
      } else if (pToken->type == TK_NK_INTEGER) {
492 493
        return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
494
      } else if (pToken->type == TK_NK_FLOAT) {
495 496
        return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
                    param);
X
Xiaoyu Wang 已提交
497 498 499 500 501 502
      } else {
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
      }
    }

    case TSDB_DATA_TYPE_TINYINT: {
X
Xiaoyu Wang 已提交
503
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
504 505 506 507 508 509
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
      } else if (!IS_VALID_TINYINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
      }

      uint8_t tmpVal = (uint8_t)iv;
X
Xiaoyu Wang 已提交
510
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
511 512
    }

H
refact  
Hongze Cheng 已提交
513
    case TSDB_DATA_TYPE_UTINYINT: {
X
Xiaoyu Wang 已提交
514
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
515
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
X
Xiaoyu Wang 已提交
516
      } else if (!IS_VALID_UTINYINT(uv)) {
X
Xiaoyu Wang 已提交
517 518
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
519
      uint8_t tmpVal = (uint8_t)uv;
X
Xiaoyu Wang 已提交
520
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
521 522 523
    }

    case TSDB_DATA_TYPE_SMALLINT: {
X
Xiaoyu Wang 已提交
524
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
525 526 527 528 529
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
      } else if (!IS_VALID_SMALLINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
      }
      int16_t tmpVal = (int16_t)iv;
X
Xiaoyu Wang 已提交
530
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
531 532 533
    }

    case TSDB_DATA_TYPE_USMALLINT: {
X
Xiaoyu Wang 已提交
534
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
535
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
X
Xiaoyu Wang 已提交
536
      } else if (!IS_VALID_USMALLINT(uv)) {
X
Xiaoyu Wang 已提交
537 538
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
539
      uint16_t tmpVal = (uint16_t)uv;
X
Xiaoyu Wang 已提交
540
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
541 542 543
    }

    case TSDB_DATA_TYPE_INT: {
X
Xiaoyu Wang 已提交
544
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
545 546 547 548 549
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
      } else if (!IS_VALID_INT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
      }
      int32_t tmpVal = (int32_t)iv;
X
Xiaoyu Wang 已提交
550
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
551 552 553
    }

    case TSDB_DATA_TYPE_UINT: {
X
Xiaoyu Wang 已提交
554
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
555
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
X
Xiaoyu Wang 已提交
556
      } else if (!IS_VALID_UINT(uv)) {
X
Xiaoyu Wang 已提交
557 558
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
559
      uint32_t tmpVal = (uint32_t)uv;
X
Xiaoyu Wang 已提交
560
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
561 562 563
    }

    case TSDB_DATA_TYPE_BIGINT: {
X
Xiaoyu Wang 已提交
564
      if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
X
Xiaoyu Wang 已提交
565 566 567 568
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
      } else if (!IS_VALID_BIGINT(iv)) {
        return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
569
      return func(pMsgBuf, &iv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
570 571 572
    }

    case TSDB_DATA_TYPE_UBIGINT: {
X
Xiaoyu Wang 已提交
573
      if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
X
Xiaoyu Wang 已提交
574
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
X
Xiaoyu Wang 已提交
575
      } else if (!IS_VALID_UBIGINT(uv)) {
X
Xiaoyu Wang 已提交
576 577
        return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
      }
X
Xiaoyu Wang 已提交
578
      return func(pMsgBuf, &uv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
579 580 581 582
    }

    case TSDB_DATA_TYPE_FLOAT: {
      double dv;
583
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
584 585
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
H
refact  
Hongze Cheng 已提交
586 587
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
          isnan(dv)) {
X
Xiaoyu Wang 已提交
588 589 590
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
      }
      float tmpVal = (float)dv;
X
Xiaoyu Wang 已提交
591
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
592 593 594 595
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double dv;
596
      if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
X
Xiaoyu Wang 已提交
597 598 599 600 601
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
      }
X
Xiaoyu Wang 已提交
602
      return func(pMsgBuf, &dv, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
603 604 605 606 607 608 609 610
    }

    case TSDB_DATA_TYPE_BINARY: {
      // Too long values will raise the invalid sql error message
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
        return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
      }

X
Xiaoyu Wang 已提交
611
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
612 613 614
    }

    case TSDB_DATA_TYPE_NCHAR: {
X
Xiaoyu Wang 已提交
615
      return func(pMsgBuf, pToken->z, pToken->n, param);
X
Xiaoyu Wang 已提交
616 617
    }

618
    case TSDB_DATA_TYPE_JSON: {
X
Xiaoyu Wang 已提交
619
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
620 621 622 623 624
        return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
      }
      return func(pMsgBuf, pToken->z, pToken->n, param);
    }

X
Xiaoyu Wang 已提交
625 626 627 628 629 630
    case TSDB_DATA_TYPE_TIMESTAMP: {
      int64_t tmpVal;
      if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
      }

X
Xiaoyu Wang 已提交
631
      return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
X
Xiaoyu Wang 已提交
632 633 634 635 636 637
    }
  }

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
638
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
C
Cary Xu 已提交
639 640
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
641 642 643 644 645 646

  if (value == NULL) {  // it is a null data
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
    return TSDB_CODE_SUCCESS;
  }

647
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
648
    const char* rowEnd = tdRowEnd(rb->pBuf);
649
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
650
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
651 652
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
653 654
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
wafwerar's avatar
wafwerar 已提交
655
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
656 657 658
      char buf[512] = {0};
      snprintf(buf, tListLen(buf), "%s", strerror(errno));
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
659
    }
660
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
661
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
662
  } else {
663
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
664
  }
665

666 667
  return TSDB_CODE_SUCCESS;
}
668 669 670

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
C
Cary Xu 已提交
671
  col_id_t nCols = pColList->numOfCols;
672

H
refact  
Hongze Cheng 已提交
673
  pColList->numOfBound = 0;
C
Cary Xu 已提交
674
  pColList->boundNullLen = 0;
C
Cary Xu 已提交
675
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
C
Cary Xu 已提交
676
  for (col_id_t i = 0; i < nCols; ++i) {
677 678 679
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

H
refact  
Hongze Cheng 已提交
680 681
  SToken   sToken;
  bool     isOrdered = true;
C
Cary Xu 已提交
682
  col_id_t lastColIdx = -1;  // last column found
683 684 685
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

686
    if (TK_NK_RP == sToken.type) {
687 688 689
      break;
    }

C
Cary Xu 已提交
690 691
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
692 693 694 695 696
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
697
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
698 699 700 701 702 703
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
704
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
705
    ++pColList->numOfBound;
C
Cary Xu 已提交
706 707 708 709 710 711 712 713 714 715 716
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
717 718 719 720 721
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
wafwerar's avatar
wafwerar 已提交
722
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
723 724 725 726
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
C
Cary Xu 已提交
727
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
728
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
729 730 731
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
C
Cary Xu 已提交
732
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
733 734 735 736 737
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
738
  if (pColList->numOfCols > pColList->numOfBound) {
739 740 741
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }
742 743 744 745

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
746 747
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
  SKvParam* pa = (SKvParam*)param;
748

749 750
  int8_t  type = pa->schema->type;
  int16_t colId = pa->schema->colId;
751

X
Xiaoyu Wang 已提交
752
  if (TSDB_DATA_TYPE_JSON == type) {
753 754 755 756
    return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
  }

  if (value == NULL) {  // it is a null data
X
Xiaoyu Wang 已提交
757 758
    // tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
    // pa->colIdx);
759 760 761
    return TSDB_CODE_SUCCESS;
  }

762 763
  if (TSDB_DATA_TYPE_BINARY == type) {
    STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
764
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
765 766 767
  } else if (TSDB_DATA_TYPE_NCHAR == type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
    int32_t output = 0;
wafwerar's avatar
wafwerar 已提交
768
    if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
X
Xiaoyu Wang 已提交
769
      char buf[512] = {0};
770
      snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
771
      return buildSyntaxErrMsg(pMsgBuf, buf, value);
772 773 774
    }

    varDataSetLen(pa->buf, output);
775
    tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
776
  } else {
777
    tdAddColToKVRow(pa->builder, colId, value, TYPE_BYTES[type]);
778 779 780 781 782
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
783
static int32_t buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, SKVRow row, int64_t suid) {
D
stmt  
dapan1121 已提交
784
  pTbReq->type = TD_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
785
  pTbReq->name = strdup(tname);
H
Hongze Cheng 已提交
786 787
  pTbReq->ctb.suid = suid;
  pTbReq->ctb.pTag = row;
X
Xiaoyu Wang 已提交
788 789 790 791

  return TSDB_CODE_SUCCESS;
}

792
// pSql -> tag1_value, ...)
wmmhello's avatar
wmmhello 已提交
793
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
794
  if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
795 796 797
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

798
  SKvParam param = {.builder = &pCxt->tagsBuilder};
X
Xiaoyu Wang 已提交
799 800 801
  SToken   sToken;
  bool     isParseBindParam = false;
  char     tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
802
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
X
Xiaoyu Wang 已提交
803
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
D
stmt  
dapan1121 已提交
804 805 806 807 808 809 810 811 812 813 814 815 816

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
    }
X
Xiaoyu Wang 已提交
817 818

    SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1];  // colId starts with 1
X
Xiaoyu Wang 已提交
819
    param.schema = pTagSchema;
H
refact  
Hongze Cheng 已提交
820 821
    CHECK_CODE(
        parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
822 823
  }

D
stmt  
dapan1121 已提交
824 825 826 827
  if (isParseBindParam) {
    return TSDB_CODE_SUCCESS;
  }

828
  SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
829
  if (NULL == row) {
830
    return buildInvalidOperationMsg(&pCxt->msg, "out of memory");
831 832 833
  }
  tdSortKVRowByColIdx(row);

wmmhello's avatar
wmmhello 已提交
834
  return buildCreateTbReq(&pCxt->createTblReq, tName, row, pCxt->pTableMeta->suid);
X
Xiaoyu Wang 已提交
835
}
836

X
Xiaoyu Wang 已提交
837 838 839 840 841 842 843 844
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
  *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
  if (NULL == *pDst) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
  return TSDB_CODE_SUCCESS;
}
845

X
Xiaoyu Wang 已提交
846 847 848 849 850 851 852 853
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
                              int32_t len, STableMeta* pMeta) {
  SVgroupInfo    vg;
  SParseContext* pBasicCtx = pCxt->pComCxt;
  CHECK_CODE(
      catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));

D
dapan 已提交
854
  pMeta->uid = 0;
X
Xiaoyu Wang 已提交
855
  pMeta->vgId = vg.vgId;
D
dapan 已提交
856
  pMeta->tableType = TSDB_CHILD_TABLE;
X
Xiaoyu Wang 已提交
857

X
Xiaoyu Wang 已提交
858 859 860 861 862
  STableMeta* pBackup = NULL;
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
863 864 865
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
D
dapan 已提交
866
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
H
refact  
Hongze Cheng 已提交
867
  int32_t      len = strlen(tbFName);
X
Xiaoyu Wang 已提交
868 869 870 871
  STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
  if (NULL != pMeta) {
    return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
  }
872

X
Xiaoyu Wang 已提交
873
  SToken sToken;
874 875
  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
876 877 878 879 880

  SName sname;
  createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
  char stbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(&sname, stbFName);
X
Xiaoyu Wang 已提交
881

D
dapan 已提交
882
  CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
883 884 885
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }
D
dapan 已提交
886
  CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
887 888

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
889
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
890 891 892

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
893
  if (TK_NK_LP == sToken.type) {
894
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
895 896 897 898 899 900 901 902
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
903
  if (TK_NK_LP != sToken.type) {
904 905
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
D
dapan 已提交
906
  CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
907 908 909 910
  NEXT_VALID_TOKEN(pCxt->pSql, sToken);
  if (TK_NK_COMMA == sToken.type) {
    return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
  } else if (TK_NK_RP != sToken.type) {
X
Xiaoyu Wang 已提交
911 912
    return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
  }
913 914 915 916

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
917 918
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, bool* gotRow,
                       char* tmpTokenBuf) {
919
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
920 921 922 923
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
924

H
refact  
Hongze Cheng 已提交
925 926
  bool      isParseBindParam = false;
  SSchema*  schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
927
  SMemParam param = {.rb = pBuilder};
H
refact  
Hongze Cheng 已提交
928
  SToken    sToken = {0};
929 930
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
931
    NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
932
    SSchema* pSchema = &schema[spd->boundColumns[i] - 1];
D
stmt  
dapan1121 已提交
933 934 935 936 937 938 939 940 941 942 943 944 945

    if (sToken.type == TK_NK_QUESTION) {
      isParseBindParam = true;
      if (NULL == pCxt->pStmtCb) {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }

      continue;
    }

    if (isParseBindParam) {
      return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
    }
X
Xiaoyu Wang 已提交
946

947
    param.schema = pSchema;
D
stmt  
dapan1121 已提交
948
    getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
949
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
950 951

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
952
      TSKEY tsKey = TD_ROW_KEY(row);
953
      checkTimestamp(pDataBlocks, (const char*)&tsKey);
954 955 956 957
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
958
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
959
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
960
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
961 962 963
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
964 965 966
        }
      }
    }
D
stmt  
dapan1121 已提交
967 968

    *gotRow = true;
C
Cary Xu 已提交
969 970
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
C
Cary Xu 已提交
971
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
972 973
    taosMemoryFree(pSTSchema);
#endif
974 975
  }

C
Cary Xu 已提交
976
  // *len = pBuilder->extendedRowSize;
977 978 979 980
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
981
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
982
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
H
refact  
Hongze Cheng 已提交
983
  int32_t       extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
984
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
985 986

  (*numOfRows) = 0;
H
refact  
Hongze Cheng 已提交
987
  char   tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
988 989
  SToken sToken;
  while (1) {
990 991
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
992
    if (TK_NK_LP != sToken.type) {
993 994
      break;
    }
995
    pCxt->pSql += index;
996 997 998 999 1000 1001 1002 1003

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

D
stmt  
dapan1121 已提交
1004 1005 1006
    bool gotRow = false;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
    if (gotRow) {
X
Xiaoyu Wang 已提交
1007
      pDataBlock->size += extendedRowSize;  // len;
D
stmt  
dapan1121 已提交
1008
    }
1009

1010 1011 1012 1013
    NEXT_VALID_TOKEN(pCxt->pSql, sToken);
    if (TK_NK_COMMA == sToken.type) {
      return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
    } else if (TK_NK_RP != sToken.type) {
1014 1015 1016
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

D
stmt  
dapan1121 已提交
1017 1018 1019
    if (gotRow) {
      (*numOfRows)++;
    }
1020 1021
  }

D
stmt  
dapan1121 已提交
1022
  if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
X
Xiaoyu Wang 已提交
1023
    return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
1024 1025 1026 1027
  }
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
1028
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
1029 1030 1031 1032
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
1033
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
1034

H
refact  
Hongze Cheng 已提交
1035
  SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
D
dapan1121 已提交
1036
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
1037 1038 1039 1040 1041 1042 1043 1044
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
1045
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
X
Xiaoyu Wang 已提交
1046
  taosMemoryFreeClear(pReq->name);
H
Hongze Cheng 已提交
1047
  taosMemoryFreeClear(pReq->ctb.pTag);
X
Xiaoyu Wang 已提交
1048 1049
}

X
Xiaoyu Wang 已提交
1050
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
wafwerar's avatar
wafwerar 已提交
1051
  taosMemoryFreeClear(pCxt->pTableMeta);
X
Xiaoyu Wang 已提交
1052 1053
  destroyBoundColumnInfo(&pCxt->tags);
  tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
X
Xiaoyu Wang 已提交
1054
  destroyCreateSubTbReq(&pCxt->createTblReq);
X
Xiaoyu Wang 已提交
1055 1056 1057 1058 1059
}

static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
1060
  taosHashCleanup(pCxt->pSubTableHashObj);
D
dapan1121 已提交
1061
  taosHashCleanup(pCxt->pTableNameHashObj);
1062 1063

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
1064 1065 1066
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

1067 1068 1069 1070 1071 1072
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
1073 1074 1075 1076
  int32_t     tbNum = 0;
  char        tbFName[TSDB_TABLE_FNAME_LEN];
  bool        autoCreateTbl = false;
  STableMeta* pMeta = NULL;
X
Xiaoyu Wang 已提交
1077

X
Xiaoyu Wang 已提交
1078
  // for each table
1079 1080
  while (1) {
    SToken sToken;
X
Xiaoyu Wang 已提交
1081
    char*  tbName = NULL;
D
stmt  
dapan1121 已提交
1082

1083 1084 1085 1086 1087
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
D
dapan1121 已提交
1088 1089 1090 1091
      if (sToken.type && pCxt->pSql[0]) {
        return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
      }
      
D
stmt  
dapan1121 已提交
1092
      if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
H
refact  
Hongze Cheng 已提交
1093
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
1094 1095 1096 1097
      }
      break;
    }

D
stmt  
dapan1121 已提交
1098
    if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
X
Xiaoyu Wang 已提交
1099
      return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
D
stmt  
dapan1121 已提交
1100 1101
    }

D
stmt  
dapan1121 已提交
1102 1103
    destroyInsertParseContextForTable(pCxt);

D
stmt  
dapan1121 已提交
1104 1105 1106
    if (TK_NK_QUESTION == sToken.type) {
      if (pCxt->pStmtCb) {
        CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
X
Xiaoyu Wang 已提交
1107

D
stmt  
dapan1121 已提交
1108 1109 1110 1111 1112 1113
        sToken.z = tbName;
        sToken.n = strlen(tbName);
      } else {
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
      }
    }
X
Xiaoyu Wang 已提交
1114

1115 1116 1117
    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

D
dapan 已提交
1118 1119 1120 1121
    SName name;
    createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
    tNameExtractFullName(&name, tbFName);

X
Xiaoyu Wang 已提交
1122 1123
    CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));

H
refact  
Hongze Cheng 已提交
1124
    // USING cluase
1125
    if (TK_USING == sToken.type) {
D
dapan 已提交
1126
      CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
1127
      NEXT_TOKEN(pCxt->pSql, sToken);
D
dapan 已提交
1128
      autoCreateTbl = true;
1129
    } else {
D
dapan1121 已提交
1130 1131 1132
      char dbFName[TSDB_DB_FNAME_LEN];
      tNameGetFullDbName(&name, dbFName);
      CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
1133 1134
    }

H
refact  
Hongze Cheng 已提交
1135
    STableDataBlocks* dataBuf = NULL;
D
dapan 已提交
1136
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
H
refact  
Hongze Cheng 已提交
1137 1138
                                    sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
                                    &dataBuf, NULL, &pCxt->createTblReq));
D
dapan1121 已提交
1139 1140
    pMeta = pCxt->pTableMeta;
    pCxt->pTableMeta = NULL;
1141

1142
    if (TK_NK_LP == sToken.type) {
1143
      // pSql -> field1_name, ...)
D
dapan1121 已提交
1144
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta)));
1145 1146 1147 1148 1149 1150
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
D
stmt  
dapan1121 已提交
1151
      TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
D
stmt  
dapan1121 已提交
1152 1153

      tbNum++;
1154 1155 1156 1157
      continue;
    }

    // FILE csv_file_path
X
Xiaoyu Wang 已提交
1158
    if (TK_FILE == sToken.type) {
1159 1160
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
1161
      if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
1162 1163 1164
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
1165
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
D
stmt  
dapan1121 已提交
1166 1167

      tbNum++;
1168 1169 1170 1171 1172
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
X
Xiaoyu Wang 已提交
1173

D
stmt  
dapan1121 已提交
1174
  if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
X
Xiaoyu Wang 已提交
1175
    SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1176 1177 1178 1179
    if (NULL == tags) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
1180 1181
    (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj,
                                pCxt->pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1182

D
dapan 已提交
1183
    memset(&pCxt->tags, 0, sizeof(pCxt->tags));
D
stmt  
dapan1121 已提交
1184 1185
    pCxt->pVgroupsHashObj = NULL;
    pCxt->pTableBlockHashObj = NULL;
D
dapan1121 已提交
1186
    pCxt->pTableMeta = NULL;
X
Xiaoyu Wang 已提交
1187

D
stmt  
dapan1121 已提交
1188 1189
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1190

1191
  // merge according to vgId
D
stmt  
dapan1121 已提交
1192
  if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
X
Xiaoyu Wang 已提交
1193
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
1194
  }
1195
  return buildOutput(pCxt);
1196 1197 1198 1199 1200 1201 1202 1203
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
X
Xiaoyu Wang 已提交
1204
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
1205
  SInsertParseContext context = {
X
Xiaoyu Wang 已提交
1206 1207 1208 1209
      .pComCxt = pContext,
      .pSql = (char*)pContext->pSql,
      .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
      .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
1210 1211
      .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
      .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
X
Xiaoyu Wang 已提交
1212 1213 1214
      .totalNum = 0,
      .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
      .pStmtCb = pContext->pStmtCb};
1215

D
stmt  
dapan1121 已提交
1216
  if (pContext->pStmtCb && *pQuery) {
X
Xiaoyu Wang 已提交
1217 1218
    (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
                                        &context.pTableBlockHashObj);
D
stmt  
dapan1121 已提交
1219
  } else {
X
Xiaoyu Wang 已提交
1220 1221 1222
    context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
    context.pTableBlockHashObj =
        taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
D
stmt  
dapan1121 已提交
1223
  }
X
Xiaoyu Wang 已提交
1224 1225

  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
X
Xiaoyu Wang 已提交
1226
      NULL == context.pTableNameHashObj || NULL == context.pOutput) {
X
Xiaoyu Wang 已提交
1227
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1228 1229
  }

D
stmt  
dapan1121 已提交
1230 1231 1232 1233
  if (pContext->pStmtCb) {
    TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
  }

1234
  if (NULL == *pQuery) {
D
stmt  
dapan1121 已提交
1235 1236 1237 1238
    *pQuery = taosMemoryCalloc(1, sizeof(SQuery));
    if (NULL == *pQuery) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
1239

D
stmt  
dapan1121 已提交
1240 1241 1242 1243
    (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
    (*pQuery)->haveResultSet = false;
    (*pQuery)->msgType = TDMT_VND_SUBMIT;
    (*pQuery)->pRoot = (SNode*)context.pOutput;
1244
  }
X
Xiaoyu Wang 已提交
1245

D
dapan1121 已提交
1246 1247 1248 1249 1250 1251
  if (NULL == (*pQuery)->pTableList) {
    (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
    if (NULL == (*pQuery)->pTableList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
1252

1253
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
1254

1255 1256 1257 1258
  int32_t code = skipInsertInto(&context);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
1259
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
X
Xiaoyu Wang 已提交
1260 1261 1262 1263 1264 1265
    SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
    while (NULL != pTable) {
      taosArrayPush((*pQuery)->pTableList, pTable);
      pTable = taosHashIterate(context.pTableNameHashObj, pTable);
    }
  }
1266
  destroyInsertParseContext(&context);
X
Xiaoyu Wang 已提交
1267
  return code;
1268
}
D
stmt  
dapan1121 已提交
1269

X
Xiaoyu Wang 已提交
1270 1271 1272 1273
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
D
stmt  
dapan1121 已提交
1274
  int32_t code = 0;
X
Xiaoyu Wang 已提交
1275 1276
  char*   tbName = NULL;

D
stmt  
dapan1121 已提交
1277
  NEXT_TOKEN(pTableName, sToken);
X
Xiaoyu Wang 已提交
1278

D
stmt  
dapan1121 已提交
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289
  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = createSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

D
stmt  
dapan1121 已提交
1290
  if (sToken.n > 0) {
D
stmt  
dapan1121 已提交
1291 1292 1293 1294 1295 1296 1297
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
1298 1299
  SVnodeModifOpStmt*  modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
  int32_t             code = 0;
D
stmt  
dapan1121 已提交
1300
  SInsertParseContext insertCtx = {
X
Xiaoyu Wang 已提交
1301 1302 1303
      .pVgroupsHashObj = pVgHash,
      .pTableBlockHashObj = pBlockHash,
      .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
D
stmt  
dapan1121 已提交
1304
  };
X
Xiaoyu Wang 已提交
1305

D
stmt  
dapan1121 已提交
1306 1307
  // merge according to vgId
  if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
D
stmt  
dapan1121 已提交
1308
    CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
D
stmt  
dapan1121 已提交
1309 1310 1311 1312
  }

  CHECK_CODE(buildOutput(&insertCtx));

wmmhello's avatar
wmmhello 已提交
1313
  destroyBlockArrayList(insertCtx.pVgDataBlocks);
D
stmt  
dapan1121 已提交
1314 1315 1316
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1317 1318 1319 1320
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
                           char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
stmt  
dapan1121 已提交
1321 1322 1323 1324 1325 1326 1327 1328 1329 1330
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  SKVRowBuilder tagBuilder;
  if (tdInitKVRowBuilder(&tagBuilder) < 0) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
1331
  SSchema* pSchema = pDataBlock->pTableMeta->schema;
D
stmt  
dapan1121 已提交
1332 1333 1334 1335 1336 1337 1338
  SKvParam param = {.builder = &tagBuilder};

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      KvRowAppend(&pBuf, NULL, 0, &param);
      continue;
    }
X
Xiaoyu Wang 已提交
1339 1340

    SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1];  // colId starts with 1
D
stmt  
dapan1121 已提交
1341 1342 1343 1344 1345 1346
    param.schema = pTagSchema;

    int32_t colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
    }
X
Xiaoyu Wang 已提交
1347 1348

    CHECK_CODE(KvRowAppend(&pBuf, (char*)bind[c].buffer, colLen, &param));
D
stmt  
dapan1121 已提交
1349 1350 1351 1352 1353
  }

  SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
  if (NULL == row) {
    tdDestroyKVRowBuilder(&tagBuilder);
1354
    return buildInvalidOperationMsg(&pBuf, "out of memory");
D
stmt  
dapan1121 已提交
1355 1356 1357 1358
  }
  tdSortKVRowByColIdx(row);

  SVCreateTbReq tbReq = {0};
wmmhello's avatar
wmmhello 已提交
1359
  CHECK_CODE(buildCreateTbReq(&tbReq, tName, row, suid));
D
stmt  
dapan1121 已提交
1360 1361 1362 1363 1364 1365 1366 1367
  CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));

  destroyCreateSubTbReq(&tbReq);
  tdDestroyKVRowBuilder(&tagBuilder);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1368 1369 1370 1371
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1372 1373
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1374 1375 1376 1377
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
stmt  
dapan1121 已提交
1378 1379
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));

D
stmt  
dapan1121 已提交
1380
  CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
X
Xiaoyu Wang 已提交
1381

D
stmt  
dapan1121 已提交
1382 1383 1384
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
X
Xiaoyu Wang 已提交
1385

D
stmt  
dapan1121 已提交
1386 1387
    for (int c = 0; c < spd->numOfBound; ++c) {
      SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
D
stmt  
dapan1121 已提交
1388 1389 1390 1391

      if (bind[c].num != rowNum) {
        return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
      }
X
Xiaoyu Wang 已提交
1392

D
stmt  
dapan1121 已提交
1393 1394 1395 1396
      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

      if (bind[c].is_null && bind[c].is_null[r]) {
D
stmt  
dapan1121 已提交
1397 1398 1399
        if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
          return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
        }
X
Xiaoyu Wang 已提交
1400

D
stmt  
dapan1121 已提交
1401 1402
        CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
      } else {
D
dapan1121 已提交
1403 1404 1405 1406
        if (bind[c].buffer_type != pColSchema->type) {
          return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
        }

D
stmt  
dapan1121 已提交
1407 1408 1409 1410
        int32_t colLen = pColSchema->bytes;
        if (IS_VAR_DATA_TYPE(pColSchema->type)) {
          colLen = bind[c].length[r];
        }
X
Xiaoyu Wang 已提交
1411 1412

        CHECK_CODE(MemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1413
      }
X
Xiaoyu Wang 已提交
1414

D
stmt  
dapan1121 已提交
1415 1416
      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1417
        checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
      }
    }
    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }
C
Cary Xu 已提交
1429 1430
#ifdef TD_DEBUG_PRINT_ROW
    STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1431
    tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1432 1433
    taosMemoryFree(pSTSchema);
#endif
D
stmt  
dapan1121 已提交
1434 1435
    pDataBlock->size += extendedRowSize;
  }
D
stmt  
dapan1121 已提交
1436

X
Xiaoyu Wang 已提交
1437
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1438 1439 1440 1441 1442 1443 1444
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1445 1446 1447 1448 1449
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
D
stmt  
dapan1121 已提交
1450 1451
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1452 1453 1454 1455
  SMemParam           param = {.rb = pBuilder};
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  bool                rowStart = (0 == colIdx);
  bool                rowEnd = ((colIdx + 1) == spd->numOfBound);
D
stmt  
dapan1121 已提交
1456 1457 1458 1459 1460

  if (rowStart) {
    CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
    CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num));
  }
X
Xiaoyu Wang 已提交
1461

D
stmt  
dapan1121 已提交
1462 1463 1464 1465 1466 1467 1468
  for (int32_t r = 0; r < bind->num; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r);  // skip the SSubmitBlk header
    if (rowStart) {
      tdSRowResetBuf(pBuilder, row);
    } else {
      tdSRowGetBuf(pBuilder, row);
    }
D
stmt  
dapan1121 已提交
1469

D
stmt  
dapan1121 已提交
1470 1471 1472 1473 1474
    SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx] - 1];

    if (bind->num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
    }
X
Xiaoyu Wang 已提交
1475

D
stmt  
dapan1121 已提交
1476 1477 1478 1479 1480 1481 1482
    param.schema = pColSchema;
    getSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, &param.toffset, &param.colIdx);

    if (bind->is_null && bind->is_null[r]) {
      if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
      }
X
Xiaoyu Wang 已提交
1483

D
stmt  
dapan1121 已提交
1484 1485
      CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
    } else {
D
dapan1121 已提交
1486 1487 1488 1489
      if (bind->buffer_type != pColSchema->type) {
        return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
      }

D
stmt  
dapan1121 已提交
1490 1491 1492 1493
      int32_t colLen = pColSchema->bytes;
      if (IS_VAR_DATA_TYPE(pColSchema->type)) {
        colLen = bind->length[r];
      }
X
Xiaoyu Wang 已提交
1494 1495

      CHECK_CODE(MemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, &param));
D
stmt  
dapan1121 已提交
1496
    }
X
Xiaoyu Wang 已提交
1497

D
stmt  
dapan1121 已提交
1498 1499
    if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
      TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1500
      checkTimestamp(pDataBlock, (const char*)&tsKey);
D
stmt  
dapan1121 已提交
1501
    }
X
Xiaoyu Wang 已提交
1502

D
stmt  
dapan1121 已提交
1503 1504 1505 1506 1507 1508 1509 1510
    // set the null value for the columns that do not assign values
    if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
X
Xiaoyu Wang 已提交
1511
    }
C
Cary Xu 已提交
1512 1513

#ifdef TD_DEBUG_PRINT_ROW
X
Xiaoyu Wang 已提交
1514
    if (rowEnd) {
C
Cary Xu 已提交
1515
      STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
C
Cary Xu 已提交
1516
      tdSRowPrint(row, pSTSchema, __func__);
C
Cary Xu 已提交
1517 1518 1519
      taosMemoryFree(pSTSchema);
    }
#endif
D
stmt  
dapan1121 已提交
1520 1521
  }

D
stmt  
dapan1121 已提交
1522 1523 1524
  if (rowEnd) {
    pDataBlock->size += extendedRowSize * bind->num;

X
Xiaoyu Wang 已提交
1525
    SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
D
stmt  
dapan1121 已提交
1526 1527 1528 1529 1530 1531 1532 1533
    if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
      return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
    }
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1534
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546
  if (fields) {
    *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
      SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1];
      strcpy((*fields)[i].name, pTagSchema->name);
      (*fields)[i].type = pTagSchema->type;
      (*fields)[i].bytes = pTagSchema->bytes;
    }
D
stmt  
dapan1121 已提交
1547 1548 1549 1550 1551 1552 1553
  }

  *fieldNum = boundInfo->numOfBound;

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1554 1555
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks*   pDataBlock = (STableDataBlocks*)pBlock;
D
stmt  
dapan1121 已提交
1556 1557 1558 1559
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
X
Xiaoyu Wang 已提交
1560 1561

  SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1562 1563 1564 1565 1566 1567 1568 1569
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1570

D
stmt  
dapan1121 已提交
1571 1572 1573
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1574 1575 1576
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields) {
  STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
  SSchema*          pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
D
stmt  
dapan1121 已提交
1577 1578
  if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
    *fieldNum = 0;
D
stmt  
dapan1121 已提交
1579 1580 1581
    if (fields) {
      *fields = NULL;
    }
D
stmt  
dapan1121 已提交
1582 1583 1584 1585 1586

    return TSDB_CODE_SUCCESS;
  }

  CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields));
X
Xiaoyu Wang 已提交
1587

D
stmt  
dapan1121 已提交
1588 1589 1590
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1591
// schemaless logic start
D
stmt  
dapan1121 已提交
1592

wmmhello's avatar
wmmhello 已提交
1593
typedef struct SmlExecTableHandle {
X
Xiaoyu Wang 已提交
1594 1595 1596
  SParsedDataColInfo tags;          // each table
  SKVRowBuilder      tagsBuilder;   // each table
  SVCreateTbReq      createTblReq;  // each table
wmmhello's avatar
wmmhello 已提交
1597
} SmlExecTableHandle;
wmmhello's avatar
wmmhello 已提交
1598

wmmhello's avatar
wmmhello 已提交
1599
typedef struct SmlExecHandle {
1600 1601 1602
  SHashObj*          pBlockHash;
  SmlExecTableHandle tableExecHandle;
  SQuery*            pQuery;
wmmhello's avatar
wmmhello 已提交
1603
} SSmlExecHandle;
wmmhello's avatar
wmmhello 已提交
1604

wmmhello's avatar
wmmhello 已提交
1605 1606 1607 1608 1609 1610 1611
static void smlDestroyTableHandle(void* pHandle) {
  SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
  tdDestroyKVRowBuilder(&handle->tagsBuilder);
  destroyBoundColumnInfo(&handle->tags);
  destroyCreateSubTbReq(&handle->createTblReq);
}

X
Xiaoyu Wang 已提交
1612
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
wmmhello's avatar
wmmhello 已提交
1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
  col_id_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0;
  pColList->boundNullLen = 0;
  memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
  for (col_id_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  bool     isOrdered = true;
  col_id_t lastColIdx = -1;  // last column found
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
X
Xiaoyu Wang 已提交
1625 1626
    SSmlKv*  kv = taosArrayGetP(cols, i);
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
wmmhello's avatar
wmmhello 已提交
1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674
    col_id_t t = lastColIdx + 1;
    col_id_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return TSDB_CODE_SML_INVALID_DATA;
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
    pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
    ++pColList->numOfBound;
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = pColList->boundColumns[i];
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
    for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

X
Xiaoyu Wang 已提交
1675
  if (pColList->numOfCols > pColList->numOfBound) {
wmmhello's avatar
wmmhello 已提交
1676 1677 1678 1679 1680 1681 1682
    memset(&pColList->boundColumns[pColList->numOfBound], 0,
           sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1683 1684
static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema,
                              SKVRow* row, SMsgBuf* msg) {
wmmhello's avatar
wmmhello 已提交
1685 1686 1687 1688 1689 1690
  if (tdInitKVRowBuilder(tagsBuilder) < 0) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SKvParam param = {.builder = tagsBuilder};
  for (int i = 0; i < tags->numOfBound; ++i) {
X
Xiaoyu Wang 已提交
1691
    SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1];  // colId starts with 1
wmmhello's avatar
wmmhello 已提交
1692
    param.schema = pTagSchema;
X
Xiaoyu Wang 已提交
1693
    SSmlKv* kv = taosArrayGetP(cols, i);
1694
    if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
1695
      KvRowAppend(msg, kv->value, kv->length, &param);
1696
    } else {
wmmhello's avatar
wmmhello 已提交
1697 1698
      KvRowAppend(msg, &(kv->value), kv->length, &param);
    }
wmmhello's avatar
wmmhello 已提交
1699 1700
  }

wmmhello's avatar
wmmhello 已提交
1701
  *row = tdGetKVRowFromBuilder(tagsBuilder);
X
Xiaoyu Wang 已提交
1702
  if (*row == NULL) {
1703
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1704
  }
wmmhello's avatar
wmmhello 已提交
1705
  tdSortKVRowByColIdx(*row);
wmmhello's avatar
wmmhello 已提交
1706 1707 1708
  return TSDB_CODE_SUCCESS;
}

1709 1710
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
                    char* tableName, char* msgBuf, int16_t msgBufLen) {
wmmhello's avatar
wmmhello 已提交
1711 1712
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

X
Xiaoyu Wang 已提交
1713
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
1714 1715
  smlDestroyTableHandle(&smlHandle->tableExecHandle);  // free for each table
  SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
wmmhello's avatar
wmmhello 已提交
1716 1717
  setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
  int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
X
Xiaoyu Wang 已提交
1718
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1719 1720 1721
    buildInvalidOperationMsg(&pBuf, "bound tags error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1722
  SKVRow row = NULL;
1723 1724
  ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema,
                       &row, &pBuf);
X
Xiaoyu Wang 已提交
1725
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1726 1727
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1728

wmmhello's avatar
wmmhello 已提交
1729
  buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, row, pTableMeta->suid);
wmmhello's avatar
wmmhello 已提交
1730

wmmhello's avatar
wmmhello 已提交
1731
  STableDataBlocks* pDataBlock = NULL;
X
Xiaoyu Wang 已提交
1732 1733
  ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
                             TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
wmmhello's avatar
wmmhello 已提交
1734
                             pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
X
Xiaoyu Wang 已提交
1735
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1736 1737 1738
    buildInvalidOperationMsg(&pBuf, "create data block error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1739 1740 1741

  SSchema* pSchema = getTableColumnSchema(pTableMeta);

1742
  ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
X
Xiaoyu Wang 已提交
1743
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1744 1745 1746
    buildInvalidOperationMsg(&pBuf, "bound cols error");
    return ret;
  }
X
Xiaoyu Wang 已提交
1747
  int32_t             extendedRowSize = getExtendedRowSize(pDataBlock);
wmmhello's avatar
wmmhello 已提交
1748 1749
  SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
  SRowBuilder*        pBuilder = &pDataBlock->rowBuilder;
X
Xiaoyu Wang 已提交
1750
  SMemParam           param = {.rb = pBuilder};
wmmhello's avatar
wmmhello 已提交
1751 1752 1753

  initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);

1754
  int32_t rowNum = taosArrayGetSize(cols);
1755
  if (rowNum <= 0) {
wmmhello's avatar
wmmhello 已提交
1756 1757
    return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
  }
wmmhello's avatar
wmmhello 已提交
1758
  ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
X
Xiaoyu Wang 已提交
1759
  if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1760 1761 1762
    buildInvalidOperationMsg(&pBuf, "allocate memory error");
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1763 1764 1765
  for (int32_t r = 0; r < rowNum; ++r) {
    STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size);  // skip the SSubmitBlk header
    tdSRowResetBuf(pBuilder, row);
1766
    void*  rowData = taosArrayGetP(cols, r);
1767
    size_t rowDataSize = 0;
1768
    if (format) {
1769
      rowDataSize = taosArrayGetSize(rowData);
wmmhello's avatar
wmmhello 已提交
1770
    }
wmmhello's avatar
wmmhello 已提交
1771 1772

    // 1. set the parsed value from sql string
1773
    for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
wmmhello's avatar
wmmhello 已提交
1774 1775 1776 1777 1778
      SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];

      param.schema = pColSchema;
      getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);

X
Xiaoyu Wang 已提交
1779 1780 1781
      SSmlKv* kv = NULL;
      if (format) {
        if (j < rowDataSize) {
1782
          kv = taosArrayGetP(rowData, j);
X
Xiaoyu Wang 已提交
1783 1784
          if (rowDataSize != spd->numOfBound &&
              (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
1785
            kv = NULL;
X
Xiaoyu Wang 已提交
1786
          } else {
1787
            j++;
1788
          }
wmmhello's avatar
wmmhello 已提交
1789
        }
X
Xiaoyu Wang 已提交
1790 1791 1792
      } else {
        void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
        if (p) kv = *p;
wmmhello's avatar
wmmhello 已提交
1793
      }
wmmhello's avatar
wmmhello 已提交
1794

1795
      if (!kv || kv->length == 0) {
wmmhello's avatar
wmmhello 已提交
1796 1797
        MemRowAppend(&pBuf, NULL, 0, &param);
      } else {
wmmhello's avatar
wmmhello 已提交
1798 1799
        int32_t colLen = kv->length;
        if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
1800
          kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
wmmhello's avatar
wmmhello 已提交
1801 1802
        }

1803
        if (IS_VAR_DATA_TYPE(kv->type)) {
wmmhello's avatar
wmmhello 已提交
1804
          MemRowAppend(&pBuf, kv->value, colLen, &param);
1805
        } else {
wmmhello's avatar
wmmhello 已提交
1806 1807
          MemRowAppend(&pBuf, &(kv->value), colLen, &param);
        }
wmmhello's avatar
wmmhello 已提交
1808 1809 1810 1811
      }

      if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
        TSKEY tsKey = TD_ROW_KEY(row);
X
Xiaoyu Wang 已提交
1812
        checkTimestamp(pDataBlock, (const char*)&tsKey);
wmmhello's avatar
wmmhello 已提交
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828
      }
    }

    // set the null value for the columns that do not assign values
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
                                spd->cols[i].toffset);
        }
      }
    }

    pDataBlock->size += extendedRowSize;
  }

X
Xiaoyu Wang 已提交
1829
  SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
wmmhello's avatar
wmmhello 已提交
1830 1831 1832 1833 1834 1835 1836
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
    return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1837 1838 1839
void* smlInitHandle(SQuery* pQuery) {
  SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
  if (!handle) return NULL;
wmmhello's avatar
wmmhello 已提交
1840 1841 1842 1843 1844 1845
  handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
  handle->pQuery = pQuery;

  return handle;
}

X
Xiaoyu Wang 已提交
1846 1847 1848
void smlDestroyHandle(void* pHandle) {
  if (!pHandle) return;
  SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
wmmhello's avatar
wmmhello 已提交
1849
  destroyBlockHashmap(handle->pBlockHash);
wmmhello's avatar
wmmhello 已提交
1850
  smlDestroyTableHandle(&handle->tableExecHandle);
wmmhello's avatar
wmmhello 已提交
1851 1852 1853 1854
  taosMemoryFree(handle);
}

int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
1855
  SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
wmmhello's avatar
wmmhello 已提交
1856 1857
  return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
wmmhello's avatar
wmmhello 已提交
1858
// schemaless logic end