tscParseInsert.c 47.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

21
#include "os.h"
22

23
#include "ttype.h"
24
#include "hash.h"
H
hzcheng 已提交
25 26 27
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
28
#include "ttokendef.h"
H
hzcheng 已提交
29
#include "taosdef.h"
H
hzcheng 已提交
30

S
slguan 已提交
31
#include "tscLog.h"
H
hjxilinx 已提交
32
#include "tscSubquery.h"
H
hzcheng 已提交
33 34
#include "tstoken.h"

S
slguan 已提交
35
#include "tdataformat.h"
36

S
slguan 已提交
37
enum {
S
slguan 已提交
38 39 40 41
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

L
lihui 已提交
42
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
H
hzcheng 已提交
43

H
Haojun Liao 已提交
44
static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
L
lihui 已提交
45
  errno = 0;
46
  *value = strtold(pToken->z, endPtr);
47 48
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
49
  if ((*endPtr - pToken->z) != pToken->n) {
50 51
    return TK_ILLEGAL;
  }
B
Bomin Zhang 已提交
52 53

  return pToken->type;
S
slguan 已提交
54
}
H
hzcheng 已提交
55

H
Haojun Liao 已提交
56
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
S
slguan 已提交
57
  int32_t   index = 0;
H
Haojun Liao 已提交
58
  SStrToken sToken;
S
slguan 已提交
59 60 61
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
62

S
slguan 已提交
63
  index = 0;
H
hzcheng 已提交
64

S
slguan 已提交
65
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
66
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
67
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
68
    // do nothing
S
slguan 已提交
69
  } else if (pToken->type == TK_INTEGER) {
S
Shengliang Guan 已提交
70
    useconds = tsosStr2int64(pToken->z);
H
hzcheng 已提交
71 72
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
dengyihao's avatar
dengyihao 已提交
73
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
74
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
75 76 77 78 79
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
80 81 82 83
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
84 85 86 87 88 89 90 91
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
92 93 94
   * time expression:
   * e.g., now+12a, now-5h
   */
H
Haojun Liao 已提交
95
  SStrToken valueToken;
S
slguan 已提交
96 97 98
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
  pTokenEnd += index;
99

S
slguan 已提交
100 101 102 103
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
    pTokenEnd += index;
104

S
slguan 已提交
105
    if (valueToken.n < 2) {
H
hjxilinx 已提交
106
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
107 108
    }

109
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
110
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
111
    }
112

H
hzcheng 已提交
113 114 115 116
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
117
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
118 119 120 121 122 123 124 125 126 127 128 129
      useconds += interval;
    } else {
      useconds = (useconds >= interval) ? useconds - interval : 0;
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

130
// todo extract the null value check
131 132 133 134
static bool isNullStr(SStrToken* pToken) {
  return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
H
Haojun Liao 已提交
135
int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
S
slguan 已提交
136 137
                             int16_t timePrec) {
  int64_t iv;
138
  int32_t ret;
139
  char   *endptr = NULL;
140

141 142 143 144
  if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
    return tscInvalidSQLErrMsg(msg, "invalid numeric data", pToken->z);
  }

H
hzcheng 已提交
145 146
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
      if (isNullStr(pToken)) {
        *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL;
      } else {
        if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
          if (strncmp(pToken->z, "true", pToken->n) == 0) {
            *(uint8_t *)payload = TSDB_TRUE;
          } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
            *(uint8_t *)payload = TSDB_FALSE;
          } else {
            return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
          }
        } else if (pToken->type == TK_INTEGER) {
          iv = strtoll(pToken->z, NULL, 10);
          *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
        } else if (pToken->type == TK_FLOAT) {
          double dv = strtod(pToken->z, NULL);
          *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
H
hzcheng 已提交
164
        } else {
165
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
166 167 168 169
        }
      }
      break;
    }
170

H
hzcheng 已提交
171
    case TSDB_DATA_TYPE_TINYINT:
172
      if (isNullStr(pToken)) {
173
        *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
174
      } else {
175 176
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
177
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
178
        } else if (!IS_VALID_TINYINT(iv)) {
179
          return tscInvalidSQLErrMsg(msg, "data overflow", pToken->z);
H
hzcheng 已提交
180 181
        }

182 183 184 185 186 187
        *((uint8_t *)payload) = (uint8_t)iv;
      }

      break;

    case TSDB_DATA_TYPE_UTINYINT:
188
      if (isNullStr(pToken)) {
189 190
        *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
      } else {
191 192
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
193
          return tscInvalidSQLErrMsg(msg, "invalid unsigned tinyint data", pToken->z);
194
        } else if (!IS_VALID_UTINYINT(iv)) {
195 196 197 198
          return tscInvalidSQLErrMsg(msg, "unsigned tinyint data overflow", pToken->z);
        }

        *((uint8_t *)payload) = (uint8_t)iv;
H
hzcheng 已提交
199 200 201 202 203
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
204
      if (isNullStr(pToken)) {
S
slguan 已提交
205
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
206
      } else {
207 208
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
209
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
210
        } else if (!IS_VALID_SMALLINT(iv)) {
H
hjxilinx 已提交
211
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
212 213
        }

S
slguan 已提交
214
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
215
      }
216 217 218 219

      break;

    case TSDB_DATA_TYPE_USMALLINT:
220
      if (isNullStr(pToken)) {
221 222
        *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
      } else {
223 224
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
225
          return tscInvalidSQLErrMsg(msg, "invalid unsigned smallint data", pToken->z);
226
        } else if (!IS_VALID_USMALLINT(iv)) {
227 228 229 230 231 232
          return tscInvalidSQLErrMsg(msg, "unsigned smallint data overflow", pToken->z);
        }

        *((uint16_t *)payload) = (uint16_t)iv;
      }

H
hzcheng 已提交
233 234 235
      break;

    case TSDB_DATA_TYPE_INT:
236
      if (isNullStr(pToken)) {
S
slguan 已提交
237
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
238
      } else {
239 240
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
241
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
242
        } else if (!IS_VALID_INT(iv)) {
H
hjxilinx 已提交
243
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
244 245
        }

S
slguan 已提交
246
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
247 248 249 250
      }

      break;

251
    case TSDB_DATA_TYPE_UINT:
252
      if (isNullStr(pToken)) {
253 254
        *((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
      } else {
255 256
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
257
          return tscInvalidSQLErrMsg(msg, "invalid unsigned int data", pToken->z);
258
        } else if (!IS_VALID_UINT(iv)) {
259 260 261 262 263 264 265 266
          return tscInvalidSQLErrMsg(msg, "unsigned int data overflow", pToken->z);
        }

        *((uint32_t *)payload) = (uint32_t)iv;
      }

      break;

H
hzcheng 已提交
267
    case TSDB_DATA_TYPE_BIGINT:
268
      if (isNullStr(pToken)) {
S
slguan 已提交
269
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
270
      } else {
271 272
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
273
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
274
        } else if (!IS_VALID_BIGINT(iv)) {
H
hjxilinx 已提交
275
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
276
        }
S
slguan 已提交
277 278

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
279 280 281
      }
      break;

282
    case TSDB_DATA_TYPE_UBIGINT:
283
      if (isNullStr(pToken)) {
284 285
        *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
      } else {
286 287
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
288
          return tscInvalidSQLErrMsg(msg, "invalid unsigned bigint data", pToken->z);
289
        } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
290 291 292 293 294 295 296
          return tscInvalidSQLErrMsg(msg, "unsigned bigint data overflow", pToken->z);
        }

        *((uint64_t *)payload) = iv;
      }
      break;

H
hzcheng 已提交
297
    case TSDB_DATA_TYPE_FLOAT:
298
      if (isNullStr(pToken)) {
S
slguan 已提交
299
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
300
      } else {
S
slguan 已提交
301 302
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
303
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
304 305
        }

306
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
H
hjxilinx 已提交
307
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
308 309
        }

310 311
//        *((float *)payload) = (float)dv;
        SET_FLOAT_VAL(payload, dv);
H
hzcheng 已提交
312 313 314 315
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
316
      if (isNullStr(pToken)) {
S
slguan 已提交
317
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
318
      } else {
S
slguan 已提交
319 320
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
321
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
322 323
        }

324
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
H
hjxilinx 已提交
325
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
326 327
        }

328
        *((double *)payload) = dv;
H
hzcheng 已提交
329 330 331 332
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
333 334
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
335
        setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
H
hjxilinx 已提交
336
      } else { // too long values will return invalid sql, not be truncated automatically
H
hjxilinx 已提交
337
        if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
H
hjxilinx 已提交
338
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
339
        }
H
hjxilinx 已提交
340
        
341
        STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
H
hzcheng 已提交
342 343 344 345 346
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
347
      if (pToken->type == TK_NULL) {
348
        setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
349
      } else {
H
hjxilinx 已提交
350
        // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
351
        int32_t output = 0;
352
        if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
H
hjxilinx 已提交
353 354
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), "%s", strerror(errno));
H
hjxilinx 已提交
355
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
356
        }
357
        
358
        varDataSetLen(payload, output);
H
hzcheng 已提交
359 360 361 362
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
363
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
364
        if (primaryKey) {
S
slguan 已提交
365
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
366
        } else {
S
slguan 已提交
367
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
368 369
        }
      } else {
S
slguan 已提交
370 371
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
372
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
373
        }
H
hjxilinx 已提交
374
        
S
slguan 已提交
375
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
376 377 378 379 380 381
      }

      break;
    }
  }

H
hjxilinx 已提交
382
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
383 384
}

S
slguan 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == 0) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
405
      return -1;  // client time/server time can not be mixed
406

S
slguan 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
420
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, SSqlCmd* pCmd,
421 422
                      int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
  int32_t index = 0;
H
Haojun Liao 已提交
423
  SStrToken sToken = {0};
S
slguan 已提交
424
  char *    payload = pDataBlocks->pData + pDataBlocks->size;
S
slguan 已提交
425

S
slguan 已提交
426
  // 1. set the parsed value from sql string
H
hzcheng 已提交
427
  int32_t rowSize = 0;
428
  for (int i = 0; i < spd->numOfAssignedCols; ++i) {
S
slguan 已提交
429
    // the start position in data block buffer of current value in sql
430 431
    char *   start = payload + spd->elems[i].offset;
    int16_t  colIndex = spd->elems[i].colIndex;
S
slguan 已提交
432
    SSchema *pSchema = schema + colIndex;
S
slguan 已提交
433
    rowSize += pSchema->bytes;
H
hzcheng 已提交
434

S
slguan 已提交
435 436 437 438 439
    index = 0;
    sToken = tStrGetToken(*str, &index, true, 0, NULL);
    *str += index;

    if (sToken.type == TK_QUESTION) {
H
Haojun Liao 已提交
440 441 442 443 444
      if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
        *code = tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
        return -1;
      }

S
TD-1057  
Shengliang Guan 已提交
445
      uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
S
slguan 已提交
446 447 448
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
449

H
Haojun Liao 已提交
450
      strcpy(pCmd->payload, "client out of memory");
451
      *code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
452 453 454
      return -1;
    }

455 456 457
    int16_t type = sToken.type;
    if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
         type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
H
Haojun Liao 已提交
458
      *code = tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z);
S
slguan 已提交
459
      return -1;
H
hzcheng 已提交
460 461
    }

S
slguan 已提交
462 463
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
464
      // delete escape character: \\, \', \"
465
      char    delim = sToken.z[0];
L
[1292]  
lihui 已提交
466 467
      int32_t cnt = 0;
      int32_t j = 0;
S
TD-1057  
Shengliang Guan 已提交
468
      for (uint32_t k = 1; k < sToken.n - 1; ++k) {
F
fang 已提交
469 470
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
471
            cnt++;
L
lihui 已提交
472 473 474
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
475 476 477
            continue;
          }
        }
478

L
[NONE]  
lihui 已提交
479
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
480 481
        j++;
      }
482
      tmpTokenBuf[j] = 0;
L
[1292]  
lihui 已提交
483
      sToken.z = tmpTokenBuf;
484
      sToken.n -= 2 + cnt;
H
hzcheng 已提交
485 486
    }

S
slguan 已提交
487
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
H
Haojun Liao 已提交
488
    int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec);
S
slguan 已提交
489
    if (ret != TSDB_CODE_SUCCESS) {
Y
TD-934  
yihaoDeng 已提交
490
      *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
H
hzcheng 已提交
491 492
      return -1;  // NOTE: here 0 mean error!
    }
493

S
slguan 已提交
494
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
495
      tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z);
496
      *code = TSDB_CODE_TSC_INVALID_TIME_STAMP;
S
slguan 已提交
497
      return -1;
498
    }
H
hzcheng 已提交
499 500
  }

S
slguan 已提交
501
  // 2. set the null value for the columns that do not assign values
502
  if (spd->numOfAssignedCols < spd->numOfCols) {
S
slguan 已提交
503
    char *ptr = payload;
H
hzcheng 已提交
504 505

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
506
      
507
      if (!spd->hasVal[i]) {  // current column do not have any value to insert, set it to null
508 509 510 511 512 513 514 515 516
        if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
          varDataSetLen(ptr, sizeof(int8_t));
          *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
        } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
          varDataSetLen(ptr, sizeof(int32_t));
          *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
        } else {
          setNull(ptr, schema[i].type, schema[i].bytes);
        }
H
hzcheng 已提交
517
      }
518
      
H
hzcheng 已提交
519 520 521
      ptr += schema[i].bytes;
    }

S
TD-1057  
Shengliang Guan 已提交
522
    rowSize = (int32_t)(ptr - payload);
H
hzcheng 已提交
523 524 525 526 527
  }

  return rowSize;
}

S
slguan 已提交
528 529 530 531 532 533 534 535 536
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
537 538
}

H
hjxilinx 已提交
539
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows,
H
Haojun Liao 已提交
540
                  SParsedDataColInfo *spd, SSqlCmd* pCmd, int32_t *code, char *tmpTokenBuf) {
S
slguan 已提交
541
  int32_t   index = 0;
H
Haojun Liao 已提交
542
  SStrToken sToken;
H
hzcheng 已提交
543

S
TD-1225  
Shengliang Guan 已提交
544
  int32_t numOfRows = 0;
H
hzcheng 已提交
545

H
hjxilinx 已提交
546
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hjxilinx 已提交
547
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
548 549
  
  int32_t  precision = tinfo.precision;
S
slguan 已提交
550 551

  if (spd->hasVal[0] == false) {
H
Haojun Liao 已提交
552
    *code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", *str);
H
hzcheng 已提交
553 554 555 556
    return -1;
  }

  while (1) {
S
slguan 已提交
557 558 559
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
560

S
slguan 已提交
561
    *str += index;
H
hjxilinx 已提交
562
    if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
L
lihui 已提交
563
      int32_t tSize;
H
Haojun Liao 已提交
564 565 566
      *code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
      if (*code != TSDB_CODE_SUCCESS) {  //TODO pass the correct error code to client
        strcpy(pCmd->payload, "client out of memory");
S
slguan 已提交
567 568
        return -1;
      }
H
Haojun Liao 已提交
569

L
lihui 已提交
570 571
      ASSERT(tSize > maxRows);
      maxRows = tSize;
H
hzcheng 已提交
572 573
    }

H
Haojun Liao 已提交
574
    int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, pCmd, precision, code, tmpTokenBuf);
575
    if (len <= 0) {  // error message has been set in tsParseOneRowData
H
hzcheng 已提交
576 577 578 579 580
      return -1;
    }

    pDataBlock->size += len;

S
slguan 已提交
581 582 583 584
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
Haojun Liao 已提交
585
      tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
Y
TD-934  
yihaoDeng 已提交
586
      *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
H
hzcheng 已提交
587 588 589 590 591 592 593
      return -1;
    }

    numOfRows++;
  }

  if (numOfRows <= 0) {
H
Haojun Liao 已提交
594
    strcpy(pCmd->payload, "no any data points");
Y
TD-934  
yihaoDeng 已提交
595
    *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
S
slguan 已提交
596 597 598
    return -1;
  } else {
    return numOfRows;
H
hzcheng 已提交
599 600 601
  }
}

S
slguan 已提交
602
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) {
H
hzcheng 已提交
603
  spd->numOfCols = numOfCols;
604
  spd->numOfAssignedCols = numOfCols;
H
hzcheng 已提交
605 606 607 608 609 610 611 612 613 614 615

  for (int32_t i = 0; i < numOfCols; ++i) {
    spd->hasVal[i] = true;
    spd->elems[i].colIndex = i;

    if (i > 0) {
      spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes;
    }
  }
}

L
lihui 已提交
616
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
S
slguan 已提交
617
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
618
  const int factor = 5;
S
slguan 已提交
619
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
L
[#1102]  
lihui 已提交
620
  
H
hzcheng 已提交
621
  // expand the allocated size
S
slguan 已提交
622 623
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
624
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
625 626
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
627

S
slguan 已提交
628 629 630 631 632
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
H
hjxilinx 已提交
633
      // do nothing, if allocate more memory failed
S
slguan 已提交
634
      pDataBlock->nAllocSize = nAllocSizeOld;
L
[#1102]  
lihui 已提交
635
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
636
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
637
    }
H
hzcheng 已提交
638 639
  }

L
[#1102]  
lihui 已提交
640
  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
641
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
642 643
}

644
static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
645 646
  pBlocks->tid = pTableMeta->id.tid;
  pBlocks->uid = pTableMeta->id.uid;
H
hjxilinx 已提交
647
  pBlocks->sversion = pTableMeta->sversion;
648 649 650 651 652 653 654

  if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
    return TSDB_CODE_TSC_INVALID_SQL;
  } else {
    pBlocks->numOfRows += numOfRows;
    return TSDB_CODE_SUCCESS;
  }
H
hzcheng 已提交
655 656
}

S
slguan 已提交
657
// data block is disordered, sort it in ascending order
H
hjxilinx 已提交
658
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
659
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
S
slguan 已提交
660 661

  // size is less than the total size, since duplicated rows may be removed yet.
662
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
S
slguan 已提交
663

S
slguan 已提交
664 665 666 667 668
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
669
  if (!dataBuf->ordered) {
670
    char *pBlockData = pBlocks->data;
S
slguan 已提交
671
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
672

S
slguan 已提交
673 674
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
675

S
slguan 已提交
676
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
677 678
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
679

S
slguan 已提交
680 681 682 683
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
684

S
slguan 已提交
685 686 687 688 689 690 691 692 693
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
694

S
slguan 已提交
695
    pBlocks->numOfRows = i + 1;
696
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
697
  }
S
slguan 已提交
698 699
}

700
static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColInfo *spd, int32_t *totalNum) {
701
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
702
  STableMeta     *pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
703
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
704
  
H
hjxilinx 已提交
705
  STableDataBlocks *dataBuf = NULL;
H
Haojun Liao 已提交
706
  int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
707
                                        sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
H
hjxilinx 已提交
708 709 710 711
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  
L
lihui 已提交
712
  int32_t maxNumOfRows;
H
hjxilinx 已提交
713
  ret = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
L
lihui 已提交
714
  if (TSDB_CODE_SUCCESS != ret) {
715
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
716
  }
717

718
  int32_t code = TSDB_CODE_TSC_INVALID_SQL;
Y
TD-1857  
yihaoDeng 已提交
719
  char *  tmpTokenBuf = calloc(1, 16*1024);  // used for deleting Escape character: \\, \', \"
L
[1292]  
lihui 已提交
720
  if (NULL == tmpTokenBuf) {
721
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
L
[1292]  
lihui 已提交
722
  }
L
lihui 已提交
723

H
Haojun Liao 已提交
724
  int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd, &code, tmpTokenBuf);
L
[1292]  
lihui 已提交
725
  free(tmpTokenBuf);
H
hzcheng 已提交
726
  if (numOfRows <= 0) {
L
[1292]  
lihui 已提交
727
    return code;
H
hzcheng 已提交
728 729
  }

S
slguan 已提交
730
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
731
    SParamInfo *param = dataBuf->params + i;
S
slguan 已提交
732 733
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
734
      param->offset -= sizeof(SSubmitBlk);
S
slguan 已提交
735 736 737
    }
  }

738
  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
739 740 741 742 743
  code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
  if (code != TSDB_CODE_SUCCESS) {
    tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", *str);
    return code;
  }
S
slguan 已提交
744

745
  dataBuf->vgId = pTableMeta->vgId;
S
slguan 已提交
746
  dataBuf->numOfTables = 1;
H
hzcheng 已提交
747

S
slguan 已提交
748
  *totalNum += numOfRows;
H
hzcheng 已提交
749 750 751
  return TSDB_CODE_SUCCESS;
}

752
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
S
slguan 已提交
753
  int32_t   index = 0;
H
Haojun Liao 已提交
754 755
  SStrToken sToken = {0};
  SStrToken tableToken = {0};
S
slguan 已提交
756
  int32_t   code = TSDB_CODE_SUCCESS;
757 758 759 760 761 762
  
  const int32_t TABLE_INDEX = 0;
  const int32_t STABLE_INDEX = 1;
  
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
763

S
slguan 已提交
764
  char *sql = *sqlstr;
765

766 767
  pSql->cmd.autoCreated = false;
  
S
slguan 已提交
768 769 770 771
  // get the token of specified table
  index = 0;
  tableToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;
H
hzcheng 已提交
772

S
slguan 已提交
773 774
  char *cstart = NULL;
  char *cend = NULL;
H
hzcheng 已提交
775

S
slguan 已提交
776 777 778 779 780
  // skip possibly exists column list
  index = 0;
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;

H
hzcheng 已提交
781
  int32_t numOfColList = 0;
S
slguan 已提交
782
  bool    createTable = false;
H
hzcheng 已提交
783

S
slguan 已提交
784 785 786
  if (sToken.type == TK_LP) {
    cstart = &sToken.z[0];
    index = 0;
H
hzcheng 已提交
787
    while (1) {
S
slguan 已提交
788 789 790
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      if (sToken.type == TK_RP) {
        cend = &sToken.z[0];
H
hzcheng 已提交
791 792 793 794 795 796
        break;
      }

      ++numOfColList;
    }

S
slguan 已提交
797 798
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
H
hzcheng 已提交
799 800 801
  }

  if (numOfColList == 0 && cstart != NULL) {
802
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
803
  }
804
  
H
hjxilinx 已提交
805
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
806 807
  
  if (sToken.type == TK_USING) {  // create table if not exists according to the super table
S
slguan 已提交
808 809 810 811
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;

H
Haojun Liao 已提交
812
    //the source super table is moved to the secondary position of the pTableMetaInfo list
813
    if (pQueryInfo->numOfTables < 2) {
H
hjxilinx 已提交
814
      tscAddEmptyMetaInfo(pQueryInfo);
815
    }
H
hzcheng 已提交
816

817 818
    STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
    code = tscSetTableFullName(pSTableMetaInfo, &sToken, pSql);
B
Bomin Zhang 已提交
819 820 821
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
822

823
    tNameExtractFullName(&pSTableMetaInfo->name, pCmd->tagData.name);
824
    pCmd->tagData.dataLen = 0;
825

826
    code = tscGetTableMeta(pSql, pSTableMetaInfo);
H
hzcheng 已提交
827 828 829 830
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

831
    if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMetaInfo)) {
H
hjxilinx 已提交
832
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
833 834
    }

835 836
    SSchema *pTagSchema = tscGetTableTagSchema(pSTableMetaInfo->pTableMeta);
    STableComInfo tinfo = tscGetTableInfo(pSTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
837
    
S
slguan 已提交
838 839 840
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
L
lihui 已提交
841

842
    SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
843
    
844
    uint8_t numOfTags = tscGetNumOfTags(pSTableMetaInfo->pTableMeta);
L
lihui 已提交
845 846 847 848 849 850
    spd.numOfCols = numOfTags;

    // if specify some tags column
    if (sToken.type != TK_LP) {
      tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
    } else {
851 852
      /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
       * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
L
lihui 已提交
853 854 855 856 857 858 859 860 861 862 863
      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < numOfTags; ++t) {
        offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
      }

      while (1) {
        index = 0;
        sToken = tStrGetToken(sql, &index, false, 0, NULL);
        sql += index;

        if (TK_STRING == sToken.type) {
H
Haojun Liao 已提交
864
          strdequote(sToken.z);
S
TD-1057  
Shengliang Guan 已提交
865
          sToken.n = (uint32_t)strtrim(sToken.z);
L
lihui 已提交
866 867 868 869 870 871 872 873 874 875 876 877
        }

        if (sToken.type == TK_RP) {
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < numOfTags; ++t) {
          if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) {
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
878
            pElem->offset = offset[t];
L
lihui 已提交
879 880 881 882 883 884
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
              return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z);
            }

885
            spd.hasVal[t] = true;
L
lihui 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898
            findColumnIndex = true;
            break;
          }
        }

        if (!findColumnIndex) {
          return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken.z);
        }
      }

      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > numOfTags) {
        return tscInvalidSQLErrMsg(pCmd->payload, "tag name expected", sToken.z);
      }
L
lihui 已提交
899 900 901 902

      index = 0;
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      sql += index;
L
lihui 已提交
903
    }
904

S
slguan 已提交
905
    if (sToken.type != TK_TAGS) {
L
lihui 已提交
906
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
H
hzcheng 已提交
907 908
    }

Y
TD-2453  
yihaoDeng 已提交
909 910 911 912 913 914 915
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.type != TK_LP) {
      return tscInvalidSQLErrMsg(pCmd->payload, NULL, sToken.z);
    }
    
B
Bomin Zhang 已提交
916 917 918 919 920
    SKVRowBuilder kvRowBuilder = {0};
    if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }

S
slguan 已提交
921 922
    uint32_t ignoreTokenTypes = TK_LP;
    uint32_t numOfIgnoreToken = 1;
L
lihui 已提交
923
    for (int i = 0; i < spd.numOfAssignedCols; ++i) {
B
Bomin Zhang 已提交
924
      SSchema* pSchema = pTagSchema + spd.elems[i].colIndex;
925

S
slguan 已提交
926 927 928
      index = 0;
      sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
      sql += index;
H
Haojun Liao 已提交
929

B
Bomin Zhang 已提交
930 931 932 933 934
      if (TK_ILLEGAL == sToken.type) {
        tdDestroyKVRowBuilder(&kvRowBuilder);
        return TSDB_CODE_TSC_INVALID_SQL;
      }

H
Haojun Liao 已提交
935
      if (sToken.n == 0 || sToken.type == TK_RP) {
S
slguan 已提交
936 937
        break;
      }
H
hzcheng 已提交
938

S
slguan 已提交
939 940 941 942
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
943 944
      }

B
Bomin Zhang 已提交
945 946
      char tagVal[TSDB_MAX_TAGS_LEN];
      code = tsParseOneColumnData(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
H
hzcheng 已提交
947
      if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
948
        tdDestroyKVRowBuilder(&kvRowBuilder);
H
hjxilinx 已提交
949
        return code;
H
hzcheng 已提交
950
      }
B
Bomin Zhang 已提交
951 952

      tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
L
lihui 已提交
953
    }
S
slguan 已提交
954

B
Bomin Zhang 已提交
955
    SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
B
Bomin Zhang 已提交
956 957
    tdDestroyKVRowBuilder(&kvRowBuilder);
    if (row == NULL) {
958
      return tscInvalidSQLErrMsg(pCmd->payload, "tag value expected", NULL);
B
Bomin Zhang 已提交
959 960
    }
    tdSortKVRowByColIdx(row);
961

962
    pCmd->tagData.dataLen = kvRowLen(row);
963 964 965 966
    if (pCmd->tagData.dataLen <= 0){
      return tscInvalidSQLErrMsg(pCmd->payload, "tag value expected", NULL);
    }
    
967
    char* pTag = realloc(pCmd->tagData.data, pCmd->tagData.dataLen);
968 969 970
    if (pTag == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
971 972

    kvRowCpy(pTag, row);
B
Bomin Zhang 已提交
973
    free(row);
974
    pCmd->tagData.data = pTag;
B
Bomin Zhang 已提交
975

L
lihui 已提交
976 977 978 979
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
Y
TD-934  
yihaoDeng 已提交
980
      return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
981 982 983
    }

    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
984
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
985 986
    }

H
Haojun Liao 已提交
987
    int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
H
hzcheng 已提交
988 989 990 991 992
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

    createTable = true;
993
    code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
994
    if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
995 996 997
      return code;
    }
    
H
hzcheng 已提交
998 999 1000 1001
  } else {
    if (cstart != NULL) {
      sql = cstart;
    } else {
S
slguan 已提交
1002
      sql = sToken.z;
H
hzcheng 已提交
1003
    }
1004
    code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
H
hjxilinx 已提交
1005
    
1006
    if (pCmd->curSql == NULL) {
1007
      assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
H
hjxilinx 已提交
1008
    }
H
hzcheng 已提交
1009 1010
  }

S
TD-1057  
Shengliang Guan 已提交
1011
  int32_t len = (int32_t)(cend - cstart + 1);
H
hzcheng 已提交
1012 1013
  if (cstart != NULL && createTable == true) {
    /* move the column list to start position of the next accessed points */
W
WangXin 已提交
1014
    memmove(sql - len, cstart, len);
H
hzcheng 已提交
1015 1016 1017 1018 1019
    *sqlstr = sql - len;
  } else {
    *sqlstr = sql;
  }

1020
  if (*sqlstr == NULL) {
1021
    code = TSDB_CODE_TSC_INVALID_SQL;
1022 1023
  }
  
H
hzcheng 已提交
1024 1025 1026
  return code;
}

H
Haojun Liao 已提交
1027
int validateTableName(char *tblName, int len, SStrToken* psTblToken) {
H
Haojun Liao 已提交
1028
  tstrncpy(psTblToken->z, tblName, TSDB_TABLE_FNAME_LEN);
S
slguan 已提交
1029

H
Hui Li 已提交
1030 1031
  psTblToken->n    = len;
  psTblToken->type = TK_ID;
B
Bomin Zhang 已提交
1032
  tSQLGetToken(psTblToken->z, &psTblToken->type);
S
slguan 已提交
1033

H
Hui Li 已提交
1034
  return tscValidateName(psTblToken);
H
huili 已提交
1035 1036
}

1037 1038 1039 1040 1041 1042 1043 1044 1045
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
  if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
  }

  pCmd->dataSourceType = type;
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
1046
/**
1047
 * parse insert sql
H
hzcheng 已提交
1048 1049 1050
 * @param pSql
 * @return
 */
H
Haojun Liao 已提交
1051
int tsParseInsertSql(SSqlObj *pSql) {
S
slguan 已提交
1052
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1053
  char* str = pCmd->curSql;
1054

S
slguan 已提交
1055
  int32_t totalNum = 0;
1056 1057 1058 1059 1060
  int32_t code = TSDB_CODE_SUCCESS;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo != NULL);

1061 1062 1063 1064 1065
  STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    code = terrno;
    return code;
1066
  }
H
hzcheng 已提交
1067

H
Haojun Liao 已提交
1068 1069 1070
  if ((code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    return code;
  }
H
hzcheng 已提交
1071

H
Haojun Liao 已提交
1072 1073 1074
  if (NULL == pCmd->pTableBlockHashList) {
    pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
    if (NULL == pCmd->pTableBlockHashList) {
1075
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1076
      goto _clean;
L
lihui 已提交
1077 1078
    }
  } else {
1079
    str = pCmd->curSql;
L
lihui 已提交
1080 1081
  }
  
H
Haojun Liao 已提交
1082
  tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableBlockHashList);
H
hzcheng 已提交
1083 1084

  while (1) {
1085
    int32_t   index = 0;
H
Haojun Liao 已提交
1086
    SStrToken sToken = tStrGetToken(str, &index, false, 0, NULL);
1087 1088 1089 1090 1091 1092 1093 1094

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      /*
       * if the data is from the data file, no data has been generated yet. So, there no data to
       * merge or submit, save the file path and parse the file in other routines.
       */
      if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
S
slguan 已提交
1095 1096 1097
        goto _clean;
      }

1098 1099 1100 1101 1102
      /*
       * if no data has been generated during parsing the sql string, error msg will return
       * Otherwise, create the first submit block and submit to virtual node.
       */
      if (totalNum == 0) {
1103
        code = TSDB_CODE_TSC_INVALID_SQL;
1104
        goto _clean;
1105 1106
      } else {
        break;
H
hzcheng 已提交
1107 1108 1109
      }
    }

1110
    pCmd->curSql = sToken.z;
H
Haojun Liao 已提交
1111
    char buf[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
1112
    SStrToken sTblToken;
B
Bomin Zhang 已提交
1113
    sTblToken.z = buf;
S
slguan 已提交
1114
    // Check if the table name available or not
H
Hui Li 已提交
1115
    if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1116
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
1117
      goto _clean;
H
huili 已提交
1118 1119
    }

H
Hui Li 已提交
1120
    if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
1121
      goto _clean;
H
hzcheng 已提交
1122 1123
    }

1124 1125
    if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
      /*
H
Haojun Liao 已提交
1126 1127
       * After retrieving the table meta from server, the sql string will be parsed from the paused position.
       * And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
1128
       */
1129
      if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
1130
        return code;
H
hzcheng 已提交
1131
      }
H
hjxilinx 已提交
1132
      
H
Haojun Liao 已提交
1133
      tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
1134
      pCmd->curSql = NULL;
1135
      goto _clean;
H
hzcheng 已提交
1136 1137
    }

weixin_48148422's avatar
weixin_48148422 已提交
1138
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
1139
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
1140
      goto _clean;
H
hzcheng 已提交
1141 1142
    }

S
slguan 已提交
1143 1144 1145
    index = 0;
    sToken = tStrGetToken(str, &index, false, 0, NULL);
    str += index;
1146

S
slguan 已提交
1147
    if (sToken.n == 0) {
1148
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
1149
      goto _clean;
H
hzcheng 已提交
1150
    }
H
hjxilinx 已提交
1151
    
H
hjxilinx 已提交
1152
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1153
    
S
slguan 已提交
1154
    if (sToken.type == TK_VALUES) {
H
hjxilinx 已提交
1155
      SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
H
hjxilinx 已提交
1156 1157
      
      SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1158
      tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
H
hzcheng 已提交
1159

1160
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
1161
        goto _clean;
H
hzcheng 已提交
1162 1163 1164 1165 1166 1167
      }

      /*
       * app here insert data in different vnodes, so we need to set the following
       * data in another submit procedure using async insert routines
       */
1168
      code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
H
hzcheng 已提交
1169
      if (code != TSDB_CODE_SUCCESS) {
1170
        goto _clean;
H
hzcheng 已提交
1171
      }
S
slguan 已提交
1172
    } else if (sToken.type == TK_FILE) {
1173
      if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
1174
        goto _clean;
H
hzcheng 已提交
1175 1176
      }

S
slguan 已提交
1177 1178
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
1179 1180
      if (sToken.type != TK_STRING && sToken.type != TK_ID) {
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
1181
        goto _clean;
1182
      }
S
slguan 已提交
1183 1184
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1185
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
1186
        goto _clean;
H
hzcheng 已提交
1187 1188
      }

H
Haojun Liao 已提交
1189 1190
      strncpy(pCmd->payload, sToken.z, sToken.n);
      strdequote(pCmd->payload);
1191

H
Haojun Liao 已提交
1192
      // todo refactor extract method
H
hzcheng 已提交
1193
      wordexp_t full_path;
H
Haojun Liao 已提交
1194
      if (wordexp(pCmd->payload, &full_path, 0) != 0) {
H
hjxilinx 已提交
1195
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
1196
        goto _clean;
H
hzcheng 已提交
1197 1198
      }

H
Haojun Liao 已提交
1199 1200
      tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
      wordfree(&full_path);
1201

S
slguan 已提交
1202
    } else if (sToken.type == TK_LP) {
H
hzcheng 已提交
1203
      /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
1204
      STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
H
hjxilinx 已提交
1205
      SSchema *   pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1206

1207
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
1208
        goto _clean;
H
hzcheng 已提交
1209 1210
      }

1211
      SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
1212
      spd.numOfCols = tinfo.numOfColumns;
H
hzcheng 已提交
1213 1214

      int16_t offset[TSDB_MAX_COLUMNS] = {0};
H
hjxilinx 已提交
1215
      for (int32_t t = 1; t < tinfo.numOfColumns; ++t) {
H
hzcheng 已提交
1216 1217 1218 1219
        offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
      }

      while (1) {
S
slguan 已提交
1220 1221 1222 1223 1224
        index = 0;
        sToken = tStrGetToken(str, &index, false, 0, NULL);
        str += index;

        if (TK_STRING == sToken.type) {
B
Bomin Zhang 已提交
1225
          tscDequoteAndTrimToken(&sToken);
S
slguan 已提交
1226 1227 1228
        }

        if (sToken.type == TK_RP) {
H
hzcheng 已提交
1229 1230 1231 1232 1233 1234
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
H
hjxilinx 已提交
1235
        for (int32_t t = 0; t < tinfo.numOfColumns; ++t) {
S
slguan 已提交
1236
          if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
S
slguan 已提交
1237
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
H
hzcheng 已提交
1238 1239 1240 1241
            pElem->offset = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
H
hjxilinx 已提交
1242
              code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
1243
              goto _clean;
H
hzcheng 已提交
1244 1245 1246 1247 1248 1249 1250 1251
            }

            spd.hasVal[t] = true;
            findColumnIndex = true;
            break;
          }
        }

S
slguan 已提交
1252
        if (!findColumnIndex) {
H
hjxilinx 已提交
1253
          code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
1254
          goto _clean;
H
hzcheng 已提交
1255 1256 1257
        }
      }

H
hjxilinx 已提交
1258
      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
H
hjxilinx 已提交
1259
        code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
1260
        goto _clean;
H
hzcheng 已提交
1261 1262
      }

S
slguan 已提交
1263 1264 1265 1266 1267
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;

      if (sToken.type != TK_VALUES) {
H
hjxilinx 已提交
1268
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
1269
        goto _clean;
H
hzcheng 已提交
1270 1271
      }

1272
      code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
H
hzcheng 已提交
1273
      if (code != TSDB_CODE_SUCCESS) {
1274
        goto _clean;
H
hzcheng 已提交
1275 1276
      }
    } else {
H
hjxilinx 已提交
1277
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
1278
      goto _clean;
H
hzcheng 已提交
1279 1280 1281
    }
  }

S
slguan 已提交
1282 1283 1284 1285
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }
1286

H
Haojun Liao 已提交
1287
  if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
H
Haojun Liao 已提交
1288
    if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
1289
      goto _clean;
S
slguan 已提交
1290
    }
H
hzcheng 已提交
1291 1292 1293 1294 1295 1296
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

_clean:
1297
  pCmd->curSql     = NULL;
1298
  pCmd->parseFinished  = 1;
H
hzcheng 已提交
1299 1300 1301
  return code;
}

H
Haojun Liao 已提交
1302
int tsInsertInitialCheck(SSqlObj *pSql) {
S
slguan 已提交
1303
  if (!pSql->pTscObj->writeAuth) {
1304
    return TSDB_CODE_TSC_NO_WRITE_AUTH;
S
slguan 已提交
1305
  }
H
hzcheng 已提交
1306

H
hjxilinx 已提交
1307
  int32_t  index = 0;
S
slguan 已提交
1308
  SSqlCmd *pCmd = &pSql->cmd;
1309

H
Haojun Liao 已提交
1310
  SStrToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
H
hjxilinx 已提交
1311
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
1312 1313 1314 1315

  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;

1316
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex);
1317

H
Haojun Liao 已提交
1318
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
1319 1320

  sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
S
slguan 已提交
1321
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1322
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1323
  }
1324

H
Haojun Liao 已提交
1325 1326
  pCmd->curSql = sToken.z + sToken.n;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1327 1328
}

H
Haojun Liao 已提交
1329
int tsParseSql(SSqlObj *pSql, bool initial) {
H
hzcheng 已提交
1330
  int32_t ret = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1331
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1332

H
Haojun Liao 已提交
1333
  if ((!pCmd->parseFinished) && (!initial)) {
1334
    tscDebug("%p resume to parse sql: %s", pSql, pCmd->curSql);
H
[TD-98]  
hjxilinx 已提交
1335
  }
1336

B
Bomin Zhang 已提交
1337 1338 1339 1340 1341
  ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }

H
hjxilinx 已提交
1342
  if (tscIsInsertData(pSql->sqlstr)) {
B
Bomin Zhang 已提交
1343 1344 1345 1346 1347 1348 1349
    if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
      return ret;
    }

    // make a backup as tsParseInsertSql may modify the string
    char* sqlstr = strdup(pSql->sqlstr);
    ret = tsParseInsertSql(pSql);
1350 1351
    if ((sqlstr == NULL) || (pSql->parseRetry >= 1) ||
        (ret != TSDB_CODE_TSC_SQL_SYNTAX_ERROR && ret != TSDB_CODE_TSC_INVALID_SQL)) {
B
Bomin Zhang 已提交
1352 1353
      free(sqlstr);
    } else {
1354
      tscResetSqlCmd(pCmd, true);
B
Bomin Zhang 已提交
1355 1356
      free(pSql->sqlstr);
      pSql->sqlstr = sqlstr;
1357
      pSql->parseRetry++;
B
Bomin Zhang 已提交
1358 1359 1360
      if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
        ret = tsParseInsertSql(pSql);
      }
H
Haojun Liao 已提交
1361
    }
H
hzcheng 已提交
1362
  } else {
1363
    SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
H
hzcheng 已提交
1364
    ret = tscToSQLCmd(pSql, &SQLInfo);
1365
    if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
1366
      tscResetSqlCmd(pCmd, true);
1367
      pSql->parseRetry++;
B
Bomin Zhang 已提交
1368 1369
      ret = tscToSQLCmd(pSql, &SQLInfo);
    }
1370

H
Haojun Liao 已提交
1371
    SqlInfoDestroy(&SQLInfo);
H
hzcheng 已提交
1372 1373 1374
  }

  /*
1375
   * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
H
Haojun Liao 已提交
1376 1377 1378
   * so do NOT use pRes->code to determine if the getTableMeta function
   * invokes new threads to get data from mgmt node or simply retrieves data from cache.
   * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
H
hzcheng 已提交
1379 1380 1381 1382
   */
  return ret;
}

S
slguan 已提交
1383 1384 1385
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1386
  pSql->res.numOfRows = 0;
S
slguan 已提交
1387

1388
  assert(pCmd->numOfClause == 1);
1389
  STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
S
slguan 已提交
1390

1391
  SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
H
Haojun Liao 已提交
1392 1393
  code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1394
    return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
H
Haojun Liao 已提交
1395
  }
S
slguan 已提交
1396

H
Haojun Liao 已提交
1397
  if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1398 1399
    return code;
  }
S
slguan 已提交
1400

1401
  STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
S
slguan 已提交
1402 1403 1404 1405
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
1406 1407 1408 1409 1410 1411 1412 1413
  return tscProcessSql(pSql);
}

typedef struct SImportFileSupport {
  SSqlObj *pSql;
  FILE    *fp;
} SImportFileSupport;

H
Haojun Liao 已提交
1414
static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRows) {
H
Haojun Liao 已提交
1415 1416
  assert(param != NULL && tres != NULL);

1417 1418 1419 1420 1421 1422 1423 1424
  char *  tokenBuf = NULL;
  size_t  n = 0;
  ssize_t readLen = 0;
  char *  line = NULL;
  int32_t count = 0;
  int32_t maxRows = 0;
  FILE *  fp   = NULL;

H
Haojun Liao 已提交
1425 1426 1427
  SSqlObj *pSql = tres;
  SSqlCmd *pCmd = &pSql->cmd;

1428
  SImportFileSupport *pSupporter = (SImportFileSupport *)param;
H
Haojun Liao 已提交
1429 1430

  SSqlObj *pParentSql = pSupporter->pSql;
1431
  fp = pSupporter->fp;
H
Haojun Liao 已提交
1432

H
Haojun Liao 已提交
1433
  int32_t code = pSql->res.code;
H
Haojun Liao 已提交
1434

H
Haojun Liao 已提交
1435 1436 1437 1438 1439 1440 1441
  // retry parse data from file and import data from the begining again
  if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
    assert(pSql->res.numOfRows == 0);
    int32_t ret = fseek(fp, 0, SEEK_SET);
    if (ret < 0) {
      tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno));
      code = TAOS_SYSTEM_ERROR(errno);
1442 1443
      goto _error;
    }
H
Haojun Liao 已提交
1444 1445
  } else if (code != TSDB_CODE_SUCCESS) {
    goto _error;
S
slguan 已提交
1446 1447
  }

H
Haojun Liao 已提交
1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458
  // accumulate the total submit records
  pParentSql->res.numOfRows += pSql->res.numOfRows;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
  SSchema *       pSchema = tscGetTableSchema(pTableMeta);
  STableComInfo   tinfo = tscGetTableInfo(pTableMeta);

  SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
  tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);

1459
  tfree(pCmd->pTableNameList);
1460 1461 1462 1463
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

  if (pCmd->pTableBlockHashList == NULL) {
    pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
1464
    if (pCmd->pTableBlockHashList == NULL) {
H
Haojun Liao 已提交
1465
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1466 1467
      goto _error;
    }
1468
  }
H
Haojun Liao 已提交
1469

H
Haojun Liao 已提交
1470
  STableDataBlocks *pTableDataBlock = NULL;
1471 1472
  int32_t           ret =
      tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
1473
                              tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL);
H
Haojun Liao 已提交
1474
  if (ret != TSDB_CODE_SUCCESS) {
1475 1476
    pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
1477 1478
  }

H
Haojun Liao 已提交
1479
  tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
1480 1481
  tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
  if (tokenBuf == NULL) {
H
Haojun Liao 已提交
1482
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1483 1484
    goto _error;
  }
H
Haojun Liao 已提交
1485

S
TD-1848  
Shengliang Guan 已提交
1486
  while ((readLen = tgetline(&line, &n, fp)) != -1) {
H
Haojun Liao 已提交
1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
      line[--readLen] = 0;
    }

    if (readLen == 0) {
      continue;
    }

    char *lineptr = line;
    strtolower(line, line);

H
Haojun Liao 已提交
1498
    int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd, tinfo.precision, &code, tokenBuf);
H
Haojun Liao 已提交
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
      pSql->res.code = code;
      break;
    }

    pTableDataBlock->size += len;

    if (++count >= maxRows) {
      break;
    }
  }

S
TD-1848  
Shengliang Guan 已提交
1511
  tfree(tokenBuf);
1512
  tfree(line);
H
Haojun Liao 已提交
1513

1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526
  pParentSql->res.code = code;
  if (code == TSDB_CODE_SUCCESS) {
    if (count > 0) {
      code = doPackSendDataBlock(pSql, count, pTableDataBlock);
      if (code == TSDB_CODE_SUCCESS) {
        return;
      } else {
        goto _error;
      }
    } else {
      taos_free_result(pSql);
      tfree(pSupporter);
      fclose(fp);
H
Haojun Liao 已提交
1527

1528 1529 1530
      pParentSql->fp = pParentSql->fetchFp;

      // all data has been sent to vnode, call user function
H
Haojun Liao 已提交
1531
      int32_t v = (code != TSDB_CODE_SUCCESS) ? code : (int32_t)pParentSql->res.numOfRows;
1532
      (*pParentSql->fp)(pParentSql->param, pParentSql, v);
H
Haojun Liao 已提交
1533
      return;
H
Haojun Liao 已提交
1534
    }
1535
  }
H
Haojun Liao 已提交
1536

1537 1538 1539 1540 1541 1542
_error:
  tfree(tokenBuf);
  tfree(line);
  taos_free_result(pSql);
  tfree(pSupporter);
  fclose(fp);
H
Haojun Liao 已提交
1543

1544
  tscAsyncResultOnError(pParentSql);
S
slguan 已提交
1545 1546
}

H
Haojun Liao 已提交
1547
void tscImportDataFromFile(SSqlObj *pSql) {
S
slguan 已提交
1548
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1549 1550 1551 1552
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

H
Haojun Liao 已提交
1553
  assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE  && strlen(pCmd->payload) != 0);
H
Haojun Liao 已提交
1554

H
Haojun Liao 已提交
1555
  SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
H
Haojun Liao 已提交
1556
  SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
H
Haojun Liao 已提交
1557
  pCmd->count = 1;
H
Haojun Liao 已提交
1558

S
TD-1207  
Shengliang Guan 已提交
1559
  FILE *fp = fopen(pCmd->payload, "rb");
H
Haojun Liao 已提交
1560 1561 1562
  if (fp == NULL) {
    pSql->res.code = TAOS_SYSTEM_ERROR(errno);
    tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
H
hzcheng 已提交
1563

H
Haojun Liao 已提交
1564
    tfree(pSupporter);
Y
TD-2453  
yihaoDeng 已提交
1565
    taos_free_result(pNew);
H
Haojun Liao 已提交
1566
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
1567 1568
    return;
  }
S
slguan 已提交
1569

H
Haojun Liao 已提交
1570
  pSupporter->pSql = pSql;
H
Haojun Liao 已提交
1571
  pSupporter->fp   = fp;
1572

H
Haojun Liao 已提交
1573
  parseFileSendDataBlock(pSupporter, pNew, TSDB_CODE_SUCCESS);
H
hzcheng 已提交
1574
}