insertParser.c 22.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "insertParser.h"

#include "dataBlockMgt.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttoken.h"
#include "ttypes.h"

#define NEXT_TOKEN(pSql, sToken) \
  do { \
    int32_t index = 0; \
    sToken = tStrGetToken(pSql, &index, false); \
    pSql += index; \
  } while (0)

34
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
35
  do { \
36
    sToken = tStrGetToken(pSql, &index, false); \
37 38
  } while (0)

39
#define CHECK_CODE(expr) \
40 41 42
  do { \
    int32_t code = expr; \
    if (TSDB_CODE_SUCCESS != code) { \
43
      return code; \
44 45 46 47 48 49 50 51 52
    } \
  } while (0)

enum {
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

typedef struct SInsertParseContext {
X
Xiaoyu Wang 已提交
53
  SParseContext* pComCxt;       // input
54 55
  char          *pSql;          // input
  SMsgBuf        msg;           // input
X
Xiaoyu Wang 已提交
56 57 58 59 60 61 62
  STableMeta* pTableMeta;       // each table
  SParsedDataColInfo tags;      // each table
  SKVRowBuilder tagsBuilder;    // each table
  SHashObj* pVgroupsHashObj;    // global
  SHashObj* pTableBlockHashObj; // global
  SArray* pTableDataBlocks;     // global
  SArray* pVgDataBlocks;        // global
63
  int32_t totalNum;
H
Haojun Liao 已提交
64
  SVnodeModifOpStmtInfo* pOutput;
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
} SInsertParseContext;

static int32_t skipInsertInto(SInsertParseContext* pCxt) {
  SToken sToken;
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INSERT != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
  }
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_INTO != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
  }
  return TSDB_CODE_SUCCESS;
}

80
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
81
  if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
82
    return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
83 84
  }

85 86
  char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
  if (NULL != p) { // db.table
H
Haojun Liao 已提交
87
    int32_t n = sprintf(fullDbName, "%d.", pCxt->pComCxt->acctId);
H
Haojun Liao 已提交
88
    strncpy(fullDbName + n, pStname->z, p - pStname->z);
89 90
    strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
  } else {
H
Haojun Liao 已提交
91
    snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->acctId, pCxt->pComCxt->db);
92 93
    strncpy(tableName, pStname->z, pStname->n);
  }
H
Haojun Liao 已提交
94

95 96 97 98
  return TSDB_CODE_SUCCESS;
}

static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
H
Haojun Liao 已提交
99
  SName name = {0};
H
Haojun Liao 已提交
100
  createSName(&name, pTname, pCxt->pComCxt, &pCxt->msg);
H
Haojun Liao 已提交
101 102 103

  char tableName[TSDB_TABLE_FNAME_LEN] = {0};
  tNameExtractFullName(&name, tableName);
H
Haojun Liao 已提交
104
  SParseContext* pBasicCtx = pCxt->pComCxt;
H
Haojun Liao 已提交
105
  CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
X
Xiaoyu Wang 已提交
106
  SVgroupInfo vg;
H
Haojun Liao 已提交
107
  CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
X
Xiaoyu Wang 已提交
108
  CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
X
Xiaoyu Wang 已提交
109
  pCxt->pTableMeta->vgId = vg.vgId; // todo remove
110 111 112 113 114 115 116 117 118 119 120 121 122
  return TSDB_CODE_SUCCESS;
}

static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
  while (start < end) {
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
      return start;
    }
    ++start;
  }
  return -1;
}

X
Xiaoyu Wang 已提交
123
static void buildMsgHeader(SVgDataBlocks* blocks) {
S
Shengliang Guan 已提交
124
    SSubmitReq* submit = (SSubmitReq*)blocks->pData;
X
Xiaoyu Wang 已提交
125
    submit->header.vgId    = htonl(blocks->vg.vgId);
X
Xiaoyu Wang 已提交
126
    submit->header.contLen = htonl(blocks->size);
127
    submit->length         = submit->header.contLen;
X
Xiaoyu Wang 已提交
128
    submit->numOfBlocks    = htonl(blocks->numOfTables);
129
    SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
X
Xiaoyu Wang 已提交
130
    int32_t numOfBlocks = blocks->numOfTables;
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    while (numOfBlocks--) {
      int32_t dataLen = blk->dataLen;
      blk->uid = htobe64(blk->uid);
      blk->tid = htonl(blk->tid);
      blk->padding = htonl(blk->padding);
      blk->sversion = htonl(blk->sversion);
      blk->dataLen = htonl(blk->dataLen);
      blk->schemaLen = htonl(blk->schemaLen);
      blk->numOfRows = htons(blk->numOfRows);
      blk = (SSubmitBlk*)(blk->data + dataLen);
    }
}

static int32_t buildOutput(SInsertParseContext* pCxt) {
  size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
  pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
  if (NULL == pCxt->pOutput->pDataBlocks) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < numOfVg; ++i) {
    STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
    SVgDataBlocks* dst = calloc(1, sizeof(SVgDataBlocks));
    if (NULL == dst) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
156
    taosHashGetClone(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
157 158
    dst->numOfTables = src->numOfTables;
    dst->size = src->size;
dengyihao's avatar
dengyihao 已提交
159
    TSWAP(dst->pData, src->pData, char*);
X
Xiaoyu Wang 已提交
160
    buildMsgHeader(dst);
161 162 163 164 165
    taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
  }
  return TSDB_CODE_SUCCESS;
}

166 167 168 169 170 171 172 173 174 175
static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == INT64_MIN) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
176
      return TSDB_CODE_FAILED; // client time/server time can not be mixed
177
    }
178
    pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
179 180
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
181
      return TSDB_CODE_FAILED;  // client time/server time can not be mixed
182
    }
183
    pDataBlocks->tsSource = TSDB_USE_CLI_TS;
184 185 186 187 188 189 190 191 192 193
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

194
static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time, SMsgBuf* pMsgBuf) {
195
  int32_t   index = 0;
196
  SToken    sToken;
197
  int64_t   interval;
198 199
  int64_t   ts = 0;
  char* pTokenEnd = *end;
200 201

  if (pToken->type == TK_NOW) {
202
    ts = taosGetTimestamp(timePrec);
203
  } else if (pToken->type == TK_INTEGER) {
204 205 206
    bool isSigned = false;
    toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
  } else { // parse the RFC-3339/ISO-8601 timestamp format string
S
os env  
Shengliang Guan 已提交
207
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
208
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
209 210 211 212 213 214 215 216
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
217 218
      *end = pTokenEnd;
      *time = ts;
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
      return 0;
    }

    break;
  }

  /*
   * time expression:
   * e.g., now+12a, now-5h
   */
  SToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false);
  pTokenEnd += index;

  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false);
    pTokenEnd += index;

    if (valueToken.n < 2) {
240
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
241 242 243 244 245 246 247 248
    }

    char unit = 0;
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
    }

    if (sToken.type == TK_PLUS) {
249
      ts += interval;
250
    } else {
251
      ts = ts - interval;
252 253
    }

254
    *end = pTokenEnd;
255 256
  }

257
  *time = ts;
258 259
  return TSDB_CODE_SUCCESS;
}
260

261
typedef struct SMemParam {
C
Cary Xu 已提交
262
  SRowBuilder* rb;
263 264
  SSchema* schema;
  int32_t toffset;
C
Cary Xu 已提交
265
  int32_t      colIdx;
266 267
} SMemParam;

C
Cary Xu 已提交
268 269 270
static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* param) {
  SMemParam*   pa = (SMemParam*)param;
  SRowBuilder* rb = pa->rb;
271
  if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
C
Cary Xu 已提交
272
    const char* rowEnd = tdRowEnd(rb->pBuf);
273
    STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
C
Cary Xu 已提交
274
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
275 276
  } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
    // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
C
Cary Xu 已提交
277 278 279
    int32_t     output = 0;
    const char* rowEnd = tdRowEnd(rb->pBuf);
    if (!taosMbsToUcs4(value, len, (char*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
280
      return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
281
    }
282
    varDataSetLen(rowEnd, output);
C
Cary Xu 已提交
283
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
284
  } else {
C
Cary Xu 已提交
285
    tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, true, pa->toffset, pa->colIdx);
286 287 288
  }
  return TSDB_CODE_SUCCESS;
}
289 290 291 292 293 294

// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
  int32_t nCols = pColList->numOfCols;

  pColList->numOfBound = 0; 
C
Cary Xu 已提交
295
  pColList->boundNullLen = 0;
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
  memset(pColList->boundedColumns, 0, sizeof(int32_t) * nCols);
  for (int32_t i = 0; i < nCols; ++i) {
    pColList->cols[i].valStat = VAL_STAT_NONE;
  }

  SToken sToken;
  bool    isOrdered = true;
  int32_t lastColIdx = -1;  // last column found
  while (1) {
    NEXT_TOKEN(pCxt->pSql, sToken);

    if (TK_RP == sToken.type) {
      break;
    }

    int32_t t = lastColIdx + 1;
    int32_t index = findCol(&sToken, t, nCols, pSchema);
    if (index < 0 && t > 0) {
      index = findCol(&sToken, 0, t, pSchema);
      isOrdered = false;
    }
    if (index < 0) {
      return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z);
    }
    if (pColList->cols[index].valStat == VAL_STAT_HAS) {
      return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
    }
    lastColIdx = index;
    pColList->cols[index].valStat = VAL_STAT_HAS;
C
Cary Xu 已提交
325
    pColList->boundedColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
326
    ++pColList->numOfBound;
C
Cary Xu 已提交
327 328 329 330 331 332 333 334 335 336 337
    switch (pSchema[t].type) {
      case TSDB_DATA_TYPE_BINARY:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
        break;
      case TSDB_DATA_TYPE_NCHAR:
        pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
        break;
      default:
        pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
        break;
    }
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
  }

  pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;

  if (!isOrdered) {
    pColList->colIdxInfo = calloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
    if (NULL == pColList->colIdxInfo) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
    for (uint16_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].schemaColIdx = (uint16_t)pColList->boundedColumns[i];
      pColIdx[i].boundIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
    for (uint16_t i = 0; i < pColList->numOfBound; ++i) {
      pColIdx[i].finalIdx = i;
    }
    qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
  }

  memset(&pColList->boundedColumns[pColList->numOfBound], 0, sizeof(int32_t) * (pColList->numOfCols - pColList->numOfBound));

  return TSDB_CODE_SUCCESS;
}

// pSql -> tag1_value, ...)
365 366
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision) {
  if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
367 368 369
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

370
  SKvParam param = {.builder = &pCxt->tagsBuilder};
371
  SToken sToken;
372
  char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
373
  for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
374
    NEXT_TOKEN(pCxt->pSql, sToken);
375
    SSchema* pSchema = &pTagsSchema[pCxt->tags.boundedColumns[i]];
376
    param.schema = pSchema;
377
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
378 379
  }

380
  SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
381 382 383 384 385
  if (NULL == row) {
    return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
  }
  tdSortKVRowByColIdx(row);

386
  // todo construct payload
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402

  tfree(row);
}

// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
  SToken sToken;

  // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
  CHECK_CODE(getTableMeta(pCxt, &sToken));
  if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
    return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
  }

  SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
403
  setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
404 405 406 407

  // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_LP == sToken.type) {
408
    CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
409 410 411 412 413 414 415 416 417 418 419
    NEXT_TOKEN(pCxt->pSql, sToken);
  }

  if (TK_TAGS != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
  }
  // pSql -> (tag1_value, ...)
  NEXT_TOKEN(pCxt->pSql, sToken);
  if (TK_LP != sToken.type) {
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
  }
420
  CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision));
421 422 423 424

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
425
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) {
426
  SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
C
Cary Xu 已提交
427 428 429 430
  SRowBuilder*        pBuilder = &pDataBlocks->rowBuilder;
  STSRow*             row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size);  // skip the SSubmitBlk header

  tdSRowResetBuf(pBuilder, row);
431

432 433
  bool isParseBindParam = false;
  SSchema* schema = getTableColumnSchema(pDataBlocks->pTableMeta);
C
Cary Xu 已提交
434
  SMemParam param = {.rb = pBuilder};
435
  SToken sToken = {0};
436 437 438
  // 1. set the parsed value from sql string
  for (int i = 0; i < spd->numOfBound; ++i) {
    NEXT_TOKEN(pCxt->pSql, sToken);
X
Xiaoyu Wang 已提交
439
    SSchema *pSchema = &schema[spd->boundedColumns[i] - 1];
440
    param.schema = pSchema;
C
Cary Xu 已提交
441
    getMemRowAppendInfo(schema, pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
442
    CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
443 444

    if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
C
Cary Xu 已提交
445
      TSKEY tsKey = TD_ROW_KEY(row);
446 447 448 449 450 451 452 453
      if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) {
        buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z);
        return TSDB_CODE_TSC_INVALID_TIME_STAMP;
      }
    }
  }

  if (!isParseBindParam) {
C
Cary Xu 已提交
454
    // set the null value for the columns that do not assign values
C
Cary Xu 已提交
455
    if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
456
      for (int32_t i = 0; i < spd->numOfCols; ++i) {
C
Cary Xu 已提交
457 458 459
        if (spd->cols[i].valStat == VAL_STAT_NONE) {  // the primary TS key is not VAL_STAT_NONE
          tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i,
                                spd->cols[i].toffset);
460 461 462 463 464
        }
      }
    }
  }

C
Cary Xu 已提交
465
  // *len = pBuilder->extendedRowSize;
466 467 468 469
  return TSDB_CODE_SUCCESS;
}

// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
470
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
471 472
  STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
  int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
C
Cary Xu 已提交
473
  CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
474 475

  (*numOfRows) = 0;
476
  char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0};  // used for deleting Escape character: \\, \', \"
477 478
  SToken sToken;
  while (1) {
479 480
    int32_t index = 0;
    NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
481 482 483
    if (TK_LP != sToken.type) {
      break;
    }
484
    pCxt->pSql += index;
485 486 487 488 489 490 491 492 493 494

    if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
      int32_t tSize;
      CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
      ASSERT(tSize >= maxRows);
      maxRows = tSize;
    }

    int32_t len = 0;
    CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &len, tmpTokenBuf));
C
Cary Xu 已提交
495
    pDataBlock->size += extendedRowSize; //len;
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515

    NEXT_TOKEN(pCxt->pSql, sToken);
    if (TK_RP != sToken.type) {
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
    }

    (*numOfRows)++;
  }

  if (0 == (*numOfRows)) {
    return  buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {  
  int32_t maxNumOfRows;
  CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));

  int32_t numOfRows = 0;
516
  CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
517 518 519 520 521 522 523 524 525 526 527

  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
  if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows)) {
    return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
  }

  dataBuf->numOfTables = 1;
  pCxt->totalNum += numOfRows;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
528 529 530 531 532 533
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
  tfree(pCxt->pTableMeta);
  destroyBoundColumnInfo(&pCxt->tags);
  tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
}

534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
  if (pDataBlock == NULL) {
    return;
  }

  tfree(pDataBlock->pData);
  if (!pDataBlock->cloned) {
    // free the refcount for metermeta
    if (pDataBlock->pTableMeta != NULL) {
      tfree(pDataBlock->pTableMeta);
    }

    destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
  }
  tfree(pDataBlock);
}

X
Xiaoyu Wang 已提交
551 552 553
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
  destroyInsertParseContextForTable(pCxt);
  taosHashCleanup(pCxt->pVgroupsHashObj);
554 555

  destroyBlockHashmap(pCxt->pTableBlockHashObj);
X
Xiaoyu Wang 已提交
556 557 558 559
  destroyBlockArrayList(pCxt->pTableDataBlocks);
  destroyBlockArrayList(pCxt->pVgDataBlocks);
}

560 561 562 563 564 565
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
X
Xiaoyu Wang 已提交
566
  // for each table
567
  while (1) {
X
Xiaoyu Wang 已提交
568 569
    destroyInsertParseContextForTable(pCxt);

570 571 572 573 574 575 576
    SToken sToken;
    // pSql -> tb_name ...
    NEXT_TOKEN(pCxt->pSql, sToken);

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      if (0 == pCxt->totalNum) {
577
        return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");;
578 579 580 581 582 583 584
      }
      break;
    }

    SToken tbnameToken = sToken;
    NEXT_TOKEN(pCxt->pSql, sToken);

585
    // USING cluase 
586 587 588 589
    if (TK_USING == sToken.type) {
      CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
      NEXT_TOKEN(pCxt->pSql, sToken);
    } else {
590
      CHECK_CODE(getTableMeta(pCxt, &tbnameToken));
591 592 593 594
    }

    STableDataBlocks *dataBuf = NULL;
    CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
595
        sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
596 597 598

    if (TK_LP == sToken.type) {
      // pSql -> field1_name, ...)
599
      CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
600 601 602 603 604 605
      NEXT_TOKEN(pCxt->pSql, sToken);
    }

    if (TK_VALUES == sToken.type) {
      // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
      CHECK_CODE(parseValuesClause(pCxt, dataBuf));
606
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_INSERT;
607 608 609 610 611 612 613 614 615 616 617
      continue;
    }

    // FILE csv_file_path
    if (TK_FILE == sToken.type) {
      // pSql -> csv_file_path
      NEXT_TOKEN(pCxt->pSql, sToken);
      if (0 == sToken.n || (TK_STRING != sToken.type && TK_ID != sToken.type)) {
        return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
      }
      // todo
618
      pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
619 620 621 622 623 624 625
      continue;
    }

    return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
  }
  // merge according to vgId
  if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
626
    CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
627
  }
628
  return buildOutput(pCxt);
629 630 631 632 633 634 635 636
}

// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
H
Haojun Liao 已提交
637
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
638 639
  SInsertParseContext context = {
    .pComCxt = pContext,
640
    .pSql = (char*) pContext->pSql,
641 642
    .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
    .pTableMeta = NULL,
X
Xiaoyu Wang 已提交
643
    .pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
644 645
    .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
    .totalNum = 0,
H
Haojun Liao 已提交
646
    .pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo))
647 648
  };

X
Xiaoyu Wang 已提交
649
  if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
650
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
651
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
652 653
  }

654
  *pInfo = context.pOutput;
655
  context.pOutput->nodeType = TSDB_SQL_INSERT;
656
  context.pOutput->payloadType = PAYLOAD_TYPE_KV;
657

658 659 660 661 662 663
  int32_t code = skipInsertInto(&context);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseInsertBody(&context);
  }
  destroyInsertParseContext(&context);
  terrno = code;
X
Xiaoyu Wang 已提交
664
  return code;
665
}