tscParseInsert.c 46.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

21
#include "os.h"
22

23
#include "ttype.h"
24
#include "hash.h"
H
hzcheng 已提交
25 26 27
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
28
#include "ttokendef.h"
H
hzcheng 已提交
29
#include "taosdef.h"
H
hzcheng 已提交
30

S
slguan 已提交
31
#include "tscLog.h"
H
hjxilinx 已提交
32
#include "tscSubquery.h"
H
hzcheng 已提交
33 34
#include "tstoken.h"

S
slguan 已提交
35
#include "tdataformat.h"
36

S
slguan 已提交
37
enum {
S
slguan 已提交
38 39 40 41
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

L
lihui 已提交
42
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
H
Haojun Liao 已提交
43
static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SSchema* pSchema, char* str, char** end);
H
hzcheng 已提交
44

H
Haojun Liao 已提交
45
static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
L
lihui 已提交
46
  errno = 0;
47
  *value = strtold(pToken->z, endPtr);
48 49
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
50
  if ((*endPtr - pToken->z) != pToken->n) {
51 52
    return TK_ILLEGAL;
  }
B
Bomin Zhang 已提交
53 54

  return pToken->type;
S
slguan 已提交
55
}
H
hzcheng 已提交
56

H
Haojun Liao 已提交
57
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
S
slguan 已提交
58
  int32_t   index = 0;
H
Haojun Liao 已提交
59
  SStrToken sToken;
S
slguan 已提交
60 61 62
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
63

S
slguan 已提交
64
  index = 0;
H
hzcheng 已提交
65

S
slguan 已提交
66
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
67
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
68
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
69
    // do nothing
S
slguan 已提交
70
  } else if (pToken->type == TK_INTEGER) {
S
TD-4088  
Shengliang Guan 已提交
71
    useconds = taosStr2int64(pToken->z);
H
hzcheng 已提交
72 73
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
dengyihao's avatar
dengyihao 已提交
74
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
75
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
76 77 78 79 80
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
81 82 83 84
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
85 86 87 88 89 90 91 92
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
93 94 95
   * time expression:
   * e.g., now+12a, now-5h
   */
H
Haojun Liao 已提交
96
  SStrToken valueToken;
S
slguan 已提交
97
  index = 0;
H
Haojun Liao 已提交
98
  sToken = tStrGetToken(pTokenEnd, &index, false);
S
slguan 已提交
99
  pTokenEnd += index;
100

S
slguan 已提交
101 102
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
H
Haojun Liao 已提交
103
    valueToken = tStrGetToken(pTokenEnd, &index, false);
S
slguan 已提交
104
    pTokenEnd += index;
105

S
slguan 已提交
106
    if (valueToken.n < 2) {
H
hjxilinx 已提交
107
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
108 109
    }

110
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
111
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
112
    }
113

H
hzcheng 已提交
114 115 116 117
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
118
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
119 120
      useconds += interval;
    } else {
D
dapan1121 已提交
121
      useconds = useconds - interval;
H
hzcheng 已提交
122 123 124 125 126 127 128 129 130
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

131 132 133 134
static bool isNullStr(SStrToken* pToken) {
  return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
H
Haojun Liao 已提交
135 136
int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
                         int16_t timePrec) {
S
slguan 已提交
137
  int64_t iv;
138
  int32_t ret;
139
  char   *endptr = NULL;
140

141 142 143 144
  if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
    return tscInvalidSQLErrMsg(msg, "invalid numeric data", pToken->z);
  }

H
hzcheng 已提交
145 146
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
      if (isNullStr(pToken)) {
        *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL;
      } else {
        if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
          if (strncmp(pToken->z, "true", pToken->n) == 0) {
            *(uint8_t *)payload = TSDB_TRUE;
          } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
            *(uint8_t *)payload = TSDB_FALSE;
          } else {
            return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
          }
        } else if (pToken->type == TK_INTEGER) {
          iv = strtoll(pToken->z, NULL, 10);
          *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
        } else if (pToken->type == TK_FLOAT) {
          double dv = strtod(pToken->z, NULL);
          *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
H
hzcheng 已提交
164
        } else {
165
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
166 167 168 169
        }
      }
      break;
    }
170

H
hzcheng 已提交
171
    case TSDB_DATA_TYPE_TINYINT:
172
      if (isNullStr(pToken)) {
173
        *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
174
      } else {
175 176
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
177
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
178
        } else if (!IS_VALID_TINYINT(iv)) {
179
          return tscInvalidSQLErrMsg(msg, "data overflow", pToken->z);
H
hzcheng 已提交
180 181
        }

182 183 184 185 186 187
        *((uint8_t *)payload) = (uint8_t)iv;
      }

      break;

    case TSDB_DATA_TYPE_UTINYINT:
188
      if (isNullStr(pToken)) {
189 190
        *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
      } else {
191 192
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
193
          return tscInvalidSQLErrMsg(msg, "invalid unsigned tinyint data", pToken->z);
194
        } else if (!IS_VALID_UTINYINT(iv)) {
195 196 197 198
          return tscInvalidSQLErrMsg(msg, "unsigned tinyint data overflow", pToken->z);
        }

        *((uint8_t *)payload) = (uint8_t)iv;
H
hzcheng 已提交
199 200 201 202 203
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
204
      if (isNullStr(pToken)) {
S
slguan 已提交
205
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
206
      } else {
207 208
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
209
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
210
        } else if (!IS_VALID_SMALLINT(iv)) {
H
hjxilinx 已提交
211
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
212 213
        }

S
slguan 已提交
214
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
215
      }
216 217 218 219

      break;

    case TSDB_DATA_TYPE_USMALLINT:
220
      if (isNullStr(pToken)) {
221 222
        *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
      } else {
223 224
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
225
          return tscInvalidSQLErrMsg(msg, "invalid unsigned smallint data", pToken->z);
226
        } else if (!IS_VALID_USMALLINT(iv)) {
227 228 229 230 231 232
          return tscInvalidSQLErrMsg(msg, "unsigned smallint data overflow", pToken->z);
        }

        *((uint16_t *)payload) = (uint16_t)iv;
      }

H
hzcheng 已提交
233 234 235
      break;

    case TSDB_DATA_TYPE_INT:
236
      if (isNullStr(pToken)) {
S
slguan 已提交
237
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
238
      } else {
239 240
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
241
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
242
        } else if (!IS_VALID_INT(iv)) {
H
hjxilinx 已提交
243
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
244 245
        }

S
slguan 已提交
246
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
247 248 249 250
      }

      break;

251
    case TSDB_DATA_TYPE_UINT:
252
      if (isNullStr(pToken)) {
253 254
        *((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
      } else {
255 256
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
257
          return tscInvalidSQLErrMsg(msg, "invalid unsigned int data", pToken->z);
258
        } else if (!IS_VALID_UINT(iv)) {
259 260 261 262 263 264 265 266
          return tscInvalidSQLErrMsg(msg, "unsigned int data overflow", pToken->z);
        }

        *((uint32_t *)payload) = (uint32_t)iv;
      }

      break;

H
hzcheng 已提交
267
    case TSDB_DATA_TYPE_BIGINT:
268
      if (isNullStr(pToken)) {
S
slguan 已提交
269
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
270
      } else {
271 272
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
273
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
274
        } else if (!IS_VALID_BIGINT(iv)) {
H
hjxilinx 已提交
275
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
276
        }
S
slguan 已提交
277 278

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
279 280 281
      }
      break;

282
    case TSDB_DATA_TYPE_UBIGINT:
283
      if (isNullStr(pToken)) {
284 285
        *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
      } else {
286 287
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
288
          return tscInvalidSQLErrMsg(msg, "invalid unsigned bigint data", pToken->z);
289
        } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
290 291 292 293 294 295 296
          return tscInvalidSQLErrMsg(msg, "unsigned bigint data overflow", pToken->z);
        }

        *((uint64_t *)payload) = iv;
      }
      break;

H
hzcheng 已提交
297
    case TSDB_DATA_TYPE_FLOAT:
298
      if (isNullStr(pToken)) {
S
slguan 已提交
299
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
300
      } else {
S
slguan 已提交
301 302
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
303
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
304 305
        }

306
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
H
hjxilinx 已提交
307
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
308 309
        }

310 311
//        *((float *)payload) = (float)dv;
        SET_FLOAT_VAL(payload, dv);
H
hzcheng 已提交
312 313 314 315
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
316
      if (isNullStr(pToken)) {
S
slguan 已提交
317
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
318
      } else {
S
slguan 已提交
319 320
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
321
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
322 323
        }

324
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
H
hjxilinx 已提交
325
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
326 327
        }

328
        *((double *)payload) = dv;
H
hzcheng 已提交
329 330 331 332
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
333 334
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
335
        setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
H
hjxilinx 已提交
336
      } else { // too long values will return invalid sql, not be truncated automatically
H
hjxilinx 已提交
337
        if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
H
hjxilinx 已提交
338
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
339
        }
H
hjxilinx 已提交
340
        
341
        STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
H
hzcheng 已提交
342 343 344 345 346
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
347
      if (pToken->type == TK_NULL) {
348
        setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
349
      } else {
H
hjxilinx 已提交
350
        // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
351
        int32_t output = 0;
352
        if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
H
hjxilinx 已提交
353 354
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), "%s", strerror(errno));
H
hjxilinx 已提交
355
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
356
        }
357
        
358
        varDataSetLen(payload, output);
H
hzcheng 已提交
359 360 361 362
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
363
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
364
        if (primaryKey) {
S
slguan 已提交
365
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
366
        } else {
S
slguan 已提交
367
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
368 369
        }
      } else {
S
slguan 已提交
370 371
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
372
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
373
        }
H
hjxilinx 已提交
374
        
S
slguan 已提交
375
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
376 377 378 379 380 381
      }

      break;
    }
  }

H
hjxilinx 已提交
382
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
383 384
}

S
slguan 已提交
385 386 387 388 389 390 391 392 393 394 395 396
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

397
  if (k == INT64_MIN) {
S
slguan 已提交
398 399 400 401 402 403 404
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
405
      return -1;  // client time/server time can not be mixed
406

S
slguan 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
420 421 422 423 424 425 426 427
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, SSqlCmd *pCmd, int16_t timePrec, int32_t *len,
                  char *tmpTokenBuf) {
  int32_t    index = 0;
  SStrToken  sToken = {0};
  char      *payload = pDataBlocks->pData + pDataBlocks->size;

  SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
  SSchema *schema = tscGetTableSchema(pDataBlocks->pTableMeta);
S
slguan 已提交
428

S
slguan 已提交
429
  // 1. set the parsed value from sql string
H
hzcheng 已提交
430
  int32_t rowSize = 0;
H
Haojun Liao 已提交
431
  for (int i = 0; i < spd->numOfBound; ++i) {
S
slguan 已提交
432
    // the start position in data block buffer of current value in sql
H
Haojun Liao 已提交
433 434 435 436
    int32_t colIndex = spd->boundedColumns[i];

    char    *start = payload + spd->cols[colIndex].offset;
    SSchema *pSchema = &schema[colIndex];
S
slguan 已提交
437
    rowSize += pSchema->bytes;
H
hzcheng 已提交
438

S
slguan 已提交
439
    index = 0;
H
Haojun Liao 已提交
440
    sToken = tStrGetToken(*str, &index, true);
S
slguan 已提交
441 442 443
    *str += index;

    if (sToken.type == TK_QUESTION) {
H
Haojun Liao 已提交
444
      if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
H
Haojun Liao 已提交
445
        return tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
H
Haojun Liao 已提交
446 447
      }

S
TD-1057  
Shengliang Guan 已提交
448
      uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
S
slguan 已提交
449 450 451
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
452

H
Haojun Liao 已提交
453
      strcpy(pCmd->payload, "client out of memory");
H
Haojun Liao 已提交
454
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
455 456
    }

457 458 459
    int16_t type = sToken.type;
    if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
         type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
H
Haojun Liao 已提交
460
      return tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z);
H
hzcheng 已提交
461 462
    }

S
slguan 已提交
463 464
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
465
      // delete escape character: \\, \', \"
466
      char    delim = sToken.z[0];
L
[1292]  
lihui 已提交
467 468
      int32_t cnt = 0;
      int32_t j = 0;
S
TD-1057  
Shengliang Guan 已提交
469
      for (uint32_t k = 1; k < sToken.n - 1; ++k) {
F
fang 已提交
470 471
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
472
            cnt++;
L
lihui 已提交
473 474 475
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
476 477 478
            continue;
          }
        }
479

L
[NONE]  
lihui 已提交
480
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
481 482
        j++;
      }
483
      tmpTokenBuf[j] = 0;
L
[1292]  
lihui 已提交
484
      sToken.z = tmpTokenBuf;
485
      sToken.n -= 2 + cnt;
H
hzcheng 已提交
486 487
    }

S
slguan 已提交
488
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
H
Haojun Liao 已提交
489
    int32_t ret = tsParseOneColumn(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec);
S
slguan 已提交
490
    if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
491
      return ret;
H
hzcheng 已提交
492
    }
493

S
slguan 已提交
494
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
495
      tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z);
H
Haojun Liao 已提交
496
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
497
    }
H
hzcheng 已提交
498 499
  }

S
slguan 已提交
500
  // 2. set the null value for the columns that do not assign values
H
Haojun Liao 已提交
501
  if (spd->numOfBound < spd->numOfCols) {
S
slguan 已提交
502
    char *ptr = payload;
H
hzcheng 已提交
503 504

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
H
Haojun Liao 已提交
505
      if (!spd->cols[i].hasVal) {  // current column do not have any value to insert, set it to null
506 507 508 509 510 511 512 513 514
        if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
          varDataSetLen(ptr, sizeof(int8_t));
          *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
        } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
          varDataSetLen(ptr, sizeof(int32_t));
          *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
        } else {
          setNull(ptr, schema[i].type, schema[i].bytes);
        }
H
hzcheng 已提交
515
      }
516
      
H
hzcheng 已提交
517 518 519
      ptr += schema[i].bytes;
    }

S
TD-1057  
Shengliang Guan 已提交
520
    rowSize = (int32_t)(ptr - payload);
H
hzcheng 已提交
521 522
  }

H
Haojun Liao 已提交
523 524
  *len = rowSize;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
525 526
}

S
slguan 已提交
527 528 529 530 531 532 533 534 535
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
536 537
}

H
Haojun Liao 已提交
538 539 540
int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SSqlCmd* pCmd, int32_t* numOfRows, char *tmpTokenBuf) {
  int32_t index = 0;
  int32_t code = 0;
H
hzcheng 已提交
541

H
Haojun Liao 已提交
542
  (*numOfRows) = 0;
H
hzcheng 已提交
543

H
Haojun Liao 已提交
544 545 546
  SStrToken sToken;

  STableMeta* pTableMeta = pDataBlock->pTableMeta;
H
hjxilinx 已提交
547
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
548 549
  
  int32_t  precision = tinfo.precision;
S
slguan 已提交
550

H
hzcheng 已提交
551
  while (1) {
S
slguan 已提交
552
    index = 0;
H
Haojun Liao 已提交
553
    sToken = tStrGetToken(*str, &index, false);
S
slguan 已提交
554
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
555

S
slguan 已提交
556
    *str += index;
H
Haojun Liao 已提交
557
    if ((*numOfRows) >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
L
lihui 已提交
558
      int32_t tSize;
H
Haojun Liao 已提交
559 560
      code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
      if (code != TSDB_CODE_SUCCESS) {  //TODO pass the correct error code to client
H
Haojun Liao 已提交
561
        strcpy(pCmd->payload, "client out of memory");
H
Haojun Liao 已提交
562
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
563
      }
H
Haojun Liao 已提交
564

L
lihui 已提交
565 566
      ASSERT(tSize > maxRows);
      maxRows = tSize;
H
hzcheng 已提交
567 568
    }

H
Haojun Liao 已提交
569 570 571 572
    int32_t len = 0;
    code = tsParseOneRow(str, pDataBlock, pCmd, precision, &len, tmpTokenBuf);
    if (code != TSDB_CODE_SUCCESS) {  // error message has been set in tsParseOneRow, return directly
      return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
H
hzcheng 已提交
573 574 575 576
    }

    pDataBlock->size += len;

S
slguan 已提交
577
    index = 0;
H
Haojun Liao 已提交
578
    sToken = tStrGetToken(*str, &index, false);
S
slguan 已提交
579 580
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
Haojun Liao 已提交
581
      tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
H
Haojun Liao 已提交
582
      code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
H
hzcheng 已提交
583 584 585
      return -1;
    }

H
Haojun Liao 已提交
586
    (*numOfRows)++;
H
hzcheng 已提交
587 588
  }

H
Haojun Liao 已提交
589
  if ((*numOfRows) <= 0) {
H
Haojun Liao 已提交
590
    strcpy(pCmd->payload, "no any data points");
H
Haojun Liao 已提交
591
    return  TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
S
slguan 已提交
592
  } else {
H
Haojun Liao 已提交
593
    return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
594 595 596
  }
}

H
Haojun Liao 已提交
597 598 599
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
  pColInfo->numOfCols = numOfCols;
  pColInfo->numOfBound = numOfCols;
H
hzcheng 已提交
600

H
Haojun Liao 已提交
601 602
  pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
  pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
H
hzcheng 已提交
603

H
Haojun Liao 已提交
604
  for (int32_t i = 0; i < pColInfo->numOfCols; ++i) {
H
hzcheng 已提交
605
    if (i > 0) {
H
Haojun Liao 已提交
606
      pColInfo->cols[i].offset = pSchema[i - 1].bytes + pColInfo->cols[i - 1].offset;
H
hzcheng 已提交
607
    }
H
Haojun Liao 已提交
608 609 610

    pColInfo->cols[i].hasVal = true;
    pColInfo->boundedColumns[i] = i;
H
hzcheng 已提交
611 612 613
  }
}

L
lihui 已提交
614
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
S
slguan 已提交
615
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
616
  const int factor = 5;
S
slguan 已提交
617
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
L
[#1102]  
lihui 已提交
618
  
H
hzcheng 已提交
619
  // expand the allocated size
S
slguan 已提交
620 621
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
622
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
623 624
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
625

S
slguan 已提交
626 627 628 629 630
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
H
hjxilinx 已提交
631
      // do nothing, if allocate more memory failed
S
slguan 已提交
632
      pDataBlock->nAllocSize = nAllocSizeOld;
L
[#1102]  
lihui 已提交
633
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
634
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
635
    }
H
hzcheng 已提交
636 637
  }

L
[#1102]  
lihui 已提交
638
  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
639
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
640 641
}

642
static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
643 644
  pBlocks->tid = pTableMeta->id.tid;
  pBlocks->uid = pTableMeta->id.uid;
H
hjxilinx 已提交
645
  pBlocks->sversion = pTableMeta->sversion;
646 647 648 649 650 651 652

  if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
    return TSDB_CODE_TSC_INVALID_SQL;
  } else {
    pBlocks->numOfRows += numOfRows;
    return TSDB_CODE_SUCCESS;
  }
H
hzcheng 已提交
653 654
}

S
slguan 已提交
655
// data block is disordered, sort it in ascending order
H
hjxilinx 已提交
656
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
657
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
S
slguan 已提交
658 659

  // size is less than the total size, since duplicated rows may be removed yet.
660
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
S
slguan 已提交
661

S
slguan 已提交
662 663 664 665 666
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
667
  if (!dataBuf->ordered) {
668
    char *pBlockData = pBlocks->data;
S
slguan 已提交
669
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
670

S
slguan 已提交
671 672
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
673

S
slguan 已提交
674
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
675 676
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
677

S
slguan 已提交
678 679 680 681
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
682

S
slguan 已提交
683 684 685 686 687 688 689 690 691
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
692

S
slguan 已提交
693
    pBlocks->numOfRows = i + 1;
694
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
695
  }
S
slguan 已提交
696 697
}

H
Haojun Liao 已提交
698 699
static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
  STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta);
H
hjxilinx 已提交
700
  
L
lihui 已提交
701
  int32_t maxNumOfRows;
H
Haojun Liao 已提交
702 703
  int32_t code = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
  if (TSDB_CODE_SUCCESS != code) {
704
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
705
  }
706

H
Haojun Liao 已提交
707 708
  code = TSDB_CODE_TSC_INVALID_SQL;
  char  *tmpTokenBuf = calloc(1, 16*1024);  // used for deleting Escape character: \\, \', \"
L
[1292]  
lihui 已提交
709
  if (NULL == tmpTokenBuf) {
710
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
L
[1292]  
lihui 已提交
711
  }
L
lihui 已提交
712

H
Haojun Liao 已提交
713 714 715
  int32_t numOfRows = 0;
  code = tsParseValues(str, dataBuf, maxNumOfRows, pCmd, &numOfRows, tmpTokenBuf);

L
[1292]  
lihui 已提交
716
  free(tmpTokenBuf);
H
Haojun Liao 已提交
717
  if (code != TSDB_CODE_SUCCESS) {
L
[1292]  
lihui 已提交
718
    return code;
H
hzcheng 已提交
719 720
  }

S
slguan 已提交
721
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
722
    SParamInfo *param = dataBuf->params + i;
S
slguan 已提交
723 724
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
725
      param->offset -= sizeof(SSubmitBlk);
S
slguan 已提交
726 727 728
    }
  }

729
  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
H
Haojun Liao 已提交
730
  code = tsSetBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows);
731 732 733 734
  if (code != TSDB_CODE_SUCCESS) {
    tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", *str);
    return code;
  }
S
slguan 已提交
735

S
slguan 已提交
736
  dataBuf->numOfTables = 1;
S
slguan 已提交
737
  *totalNum += numOfRows;
H
hzcheng 已提交
738 739 740
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
741
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundColumn) {
S
slguan 已提交
742
  int32_t   index = 0;
H
Haojun Liao 已提交
743 744
  SStrToken sToken = {0};
  SStrToken tableToken = {0};
S
slguan 已提交
745
  int32_t   code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
746

747 748 749 750
  const int32_t TABLE_INDEX = 0;
  const int32_t STABLE_INDEX = 1;
  
  SSqlCmd *   pCmd = &pSql->cmd;
H
Haojun Liao 已提交
751
  SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
H
hzcheng 已提交
752

S
slguan 已提交
753
  char *sql = *sqlstr;
754

755 756
  pSql->cmd.autoCreated = false;
  
S
slguan 已提交
757 758
  // get the token of specified table
  index = 0;
H
Haojun Liao 已提交
759
  tableToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
760
  sql += index;
H
hzcheng 已提交
761

S
slguan 已提交
762 763
  // skip possibly exists column list
  index = 0;
H
Haojun Liao 已提交
764
  sToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
765 766
  sql += index;

H
hzcheng 已提交
767 768
  int32_t numOfColList = 0;

H
Haojun Liao 已提交
769
  // Bind table columns list in string, skip it and continue
S
slguan 已提交
770
  if (sToken.type == TK_LP) {
H
Haojun Liao 已提交
771 772
    *boundColumn = &sToken.z[0];

H
hzcheng 已提交
773
    while (1) {
H
Haojun Liao 已提交
774 775 776
      index = 0;
      sToken = tStrGetToken(sql, &index, false);

S
slguan 已提交
777
      if (sToken.type == TK_RP) {
H
hzcheng 已提交
778 779 780
        break;
      }

H
Haojun Liao 已提交
781
      sql += index;
H
hzcheng 已提交
782 783 784
      ++numOfColList;
    }

H
Haojun Liao 已提交
785
    sToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
786
    sql += index;
H
hzcheng 已提交
787 788
  }

H
Haojun Liao 已提交
789
  if (numOfColList == 0 && (*boundColumn) != NULL) {
790
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
791
  }
792
  
H
hjxilinx 已提交
793
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
794 795
  
  if (sToken.type == TK_USING) {  // create table if not exists according to the super table
S
slguan 已提交
796
    index = 0;
H
Haojun Liao 已提交
797
    sToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
798 799
    sql += index;

H
Haojun Liao 已提交
800
    //the source super table is moved to the secondary position of the pTableMetaInfo list
801
    if (pQueryInfo->numOfTables < 2) {
H
hjxilinx 已提交
802
      tscAddEmptyMetaInfo(pQueryInfo);
803
    }
H
hzcheng 已提交
804

805 806
    STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
    code = tscSetTableFullName(pSTableMetaInfo, &sToken, pSql);
B
Bomin Zhang 已提交
807 808 809
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
810

811
    tNameExtractFullName(&pSTableMetaInfo->name, pCmd->tagData.name);
812
    pCmd->tagData.dataLen = 0;
813

814
    code = tscGetTableMeta(pSql, pSTableMetaInfo);
H
hzcheng 已提交
815 816 817 818
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

819
    if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMetaInfo)) {
H
hjxilinx 已提交
820
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
821 822
    }

823 824
    SSchema *pTagSchema = tscGetTableTagSchema(pSTableMetaInfo->pTableMeta);
    STableComInfo tinfo = tscGetTableInfo(pSTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
825
    
826
    SParsedDataColInfo spd = {0};
H
Haojun Liao 已提交
827
    tscSetBoundColumnInfo(&spd, pTagSchema, tscGetNumOfTags(pSTableMetaInfo->pTableMeta));
L
lihui 已提交
828

H
Haojun Liao 已提交
829 830 831
    index = 0;
    sToken = tStrGetToken(sql, &index, false);
    if (sToken.type != TK_TAGS && sToken.type != TK_LP) {
H
Haojun Liao 已提交
832
      tscDestroyBoundColumnInfo(&spd);
H
Haojun Liao 已提交
833 834
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
    }
L
lihui 已提交
835

H
Haojun Liao 已提交
836 837 838 839 840 841 842 843 844
    // parse the bound tags column
    if (sToken.type == TK_LP) {
      /*
       * insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
       * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn);
       */
      char* end = NULL;
      code = parseBoundColumns(pCmd, &spd, pTagSchema, sql, &end);
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
845
        tscDestroyBoundColumnInfo(&spd);
H
Haojun Liao 已提交
846
        return code;
L
lihui 已提交
847 848
      }

H
Haojun Liao 已提交
849
      sql = end;
L
lihui 已提交
850

H
Haojun Liao 已提交
851 852 853 854
      index = 0;  // keywords of "TAGS"
      sToken = tStrGetToken(sql, &index, false);
      sql += index;
    } else {
L
lihui 已提交
855
      sql += index;
H
hzcheng 已提交
856 857
    }

Y
TD-2453  
yihaoDeng 已提交
858
    index = 0;
H
Haojun Liao 已提交
859
    sToken = tStrGetToken(sql, &index, false);
Y
TD-2453  
yihaoDeng 已提交
860
    sql += index;
H
Haojun Liao 已提交
861

Y
TD-2453  
yihaoDeng 已提交
862
    if (sToken.type != TK_LP) {
H
Haojun Liao 已提交
863
      tscDestroyBoundColumnInfo(&spd);
H
Haojun Liao 已提交
864
      return tscInvalidSQLErrMsg(pCmd->payload, "( is expected", sToken.z);
Y
TD-2453  
yihaoDeng 已提交
865 866
    }
    
B
Bomin Zhang 已提交
867 868
    SKVRowBuilder kvRowBuilder = {0};
    if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
H
Haojun Liao 已提交
869
      tscDestroyBoundColumnInfo(&spd);
B
Bomin Zhang 已提交
870 871 872
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
873 874
    for (int i = 0; i < spd.numOfBound; ++i) {
      SSchema* pSchema = &pTagSchema[spd.boundedColumns[i]];
875

S
slguan 已提交
876
      index = 0;
H
Haojun Liao 已提交
877
      sToken = tStrGetToken(sql, &index, true);
S
slguan 已提交
878
      sql += index;
H
Haojun Liao 已提交
879

B
Bomin Zhang 已提交
880 881
      if (TK_ILLEGAL == sToken.type) {
        tdDestroyKVRowBuilder(&kvRowBuilder);
H
Haojun Liao 已提交
882
        tscDestroyBoundColumnInfo(&spd);
B
Bomin Zhang 已提交
883 884 885
        return TSDB_CODE_TSC_INVALID_SQL;
      }

H
Haojun Liao 已提交
886
      if (sToken.n == 0 || sToken.type == TK_RP) {
S
slguan 已提交
887 888
        break;
      }
H
hzcheng 已提交
889

S
slguan 已提交
890 891 892 893
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
894 895
      }

B
Bomin Zhang 已提交
896
      char tagVal[TSDB_MAX_TAGS_LEN];
H
Haojun Liao 已提交
897
      code = tsParseOneColumn(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
H
hzcheng 已提交
898
      if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
899
        tdDestroyKVRowBuilder(&kvRowBuilder);
H
Haojun Liao 已提交
900
        tscDestroyBoundColumnInfo(&spd);
H
hjxilinx 已提交
901
        return code;
H
hzcheng 已提交
902
      }
B
Bomin Zhang 已提交
903 904

      tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
L
lihui 已提交
905
    }
S
slguan 已提交
906

H
Haojun Liao 已提交
907 908
    tscDestroyBoundColumnInfo(&spd);

B
Bomin Zhang 已提交
909
    SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
B
Bomin Zhang 已提交
910 911
    tdDestroyKVRowBuilder(&kvRowBuilder);
    if (row == NULL) {
912
      return tscInvalidSQLErrMsg(pCmd->payload, "tag value expected", NULL);
B
Bomin Zhang 已提交
913 914
    }
    tdSortKVRowByColIdx(row);
915

916
    pCmd->tagData.dataLen = kvRowLen(row);
917 918 919 920
    if (pCmd->tagData.dataLen <= 0){
      return tscInvalidSQLErrMsg(pCmd->payload, "tag value expected", NULL);
    }
    
921
    char* pTag = realloc(pCmd->tagData.data, pCmd->tagData.dataLen);
922 923 924
    if (pTag == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
925 926

    kvRowCpy(pTag, row);
B
Bomin Zhang 已提交
927
    free(row);
928
    pCmd->tagData.data = pTag;
B
Bomin Zhang 已提交
929

L
lihui 已提交
930
    index = 0;
H
Haojun Liao 已提交
931
    sToken = tStrGetToken(sql, &index, false);
L
lihui 已提交
932 933
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
Y
TD-934  
yihaoDeng 已提交
934
      return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
935 936 937
    }

    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
938
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
939 940
    }

H
Haojun Liao 已提交
941
    int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
H
hzcheng 已提交
942 943 944 945
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

D
dapan1121 已提交
946 947 948 949
    if (sql == NULL) {
      return TSDB_CODE_TSC_INVALID_SQL;
    }

950
    code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
951
    if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
952 953 954
      return code;
    }
    
H
hzcheng 已提交
955
  } else {
H
Haojun Liao 已提交
956 957
    sql = sToken.z;

D
dapan1121 已提交
958 959
    if (sql == NULL) {
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
960
    }
D
dapan1121 已提交
961

962
    code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
963
    if (pCmd->curSql == NULL) {
964
      assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
H
hjxilinx 已提交
965
    }
H
hzcheng 已提交
966 967
  }

H
Haojun Liao 已提交
968
  *sqlstr = sql;
969
  
H
hzcheng 已提交
970 971 972
  return code;
}

H
Haojun Liao 已提交
973
int validateTableName(char *tblName, int len, SStrToken* psTblToken) {
H
Haojun Liao 已提交
974
  tstrncpy(psTblToken->z, tblName, TSDB_TABLE_FNAME_LEN);
S
slguan 已提交
975

H
Hui Li 已提交
976 977
  psTblToken->n    = len;
  psTblToken->type = TK_ID;
B
Bomin Zhang 已提交
978
  tSQLGetToken(psTblToken->z, &psTblToken->type);
S
slguan 已提交
979

H
Hui Li 已提交
980
  return tscValidateName(psTblToken);
H
huili 已提交
981 982
}

983 984 985 986 987 988 989 990 991
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
  if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
  }

  pCmd->dataSourceType = type;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SSchema* pSchema,
    char* str, char **end) {
  pColInfo->numOfBound = 0;

  memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * pColInfo->numOfCols);
  for(int32_t i = 0; i < pColInfo->numOfCols; ++i) {
    pColInfo->cols[i].hasVal = false;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  int32_t index = 0;
  SStrToken sToken = tStrGetToken(str, &index, false);
  str += index;

  if (sToken.type != TK_LP) {
    code = tscInvalidSQLErrMsg(pCmd->payload, "( is expected", sToken.z);
    goto _clean;
  }

  while (1) {
    index = 0;
    sToken = tStrGetToken(str, &index, false);
    str += index;

    if (TK_STRING == sToken.type) {
      tscDequoteAndTrimToken(&sToken);
    }

    if (sToken.type == TK_RP) {
      if (end != NULL) {  // set the end position
        *end = str;
      }

      break;
    }

    bool findColumnIndex = false;

    // todo speedup by using hash list
    for (int32_t t = 0; t < pColInfo->numOfCols; ++t) {
      if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
        if (pColInfo->cols[t].hasVal == true) {
          code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
          goto _clean;
        }

        pColInfo->cols[t].hasVal = true;
        pColInfo->boundedColumns[pColInfo->numOfBound] = t;
        pColInfo->numOfBound += 1;
        findColumnIndex = true;
        break;
      }
    }

    if (!findColumnIndex) {
      code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column/tag name", sToken.z);
      goto _clean;
    }
  }

  memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0 , sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound));
  return TSDB_CODE_SUCCESS;

  _clean:
  pCmd->curSql     = NULL;
  pCmd->parseFinished  = 1;
  return code;
}

H
hzcheng 已提交
1062
/**
1063
 * parse insert sql
H
hzcheng 已提交
1064 1065 1066
 * @param pSql
 * @return
 */
H
Haojun Liao 已提交
1067
int tsParseInsertSql(SSqlObj *pSql) {
S
slguan 已提交
1068
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1069
  char* str = pCmd->curSql;
1070

S
slguan 已提交
1071
  int32_t totalNum = 0;
1072 1073
  int32_t code = TSDB_CODE_SUCCESS;

H
Haojun Liao 已提交
1074
  SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
1075 1076
  assert(pQueryInfo != NULL);

1077 1078 1079 1080 1081
  STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    code = terrno;
    return code;
1082
  }
H
hzcheng 已提交
1083

H
Haojun Liao 已提交
1084 1085 1086
  if ((code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    return code;
  }
H
hzcheng 已提交
1087

H
Haojun Liao 已提交
1088 1089 1090
  if (NULL == pCmd->pTableBlockHashList) {
    pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
    if (NULL == pCmd->pTableBlockHashList) {
1091
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1092
      goto _clean;
L
lihui 已提交
1093 1094
    }
  } else {
1095
    str = pCmd->curSql;
L
lihui 已提交
1096 1097
  }
  
H
Haojun Liao 已提交
1098
  tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->pTableBlockHashList);
H
hzcheng 已提交
1099 1100

  while (1) {
1101
    int32_t   index = 0;
H
Haojun Liao 已提交
1102
    SStrToken sToken = tStrGetToken(str, &index, false);
1103 1104 1105 1106 1107 1108 1109 1110

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      /*
       * if the data is from the data file, no data has been generated yet. So, there no data to
       * merge or submit, save the file path and parse the file in other routines.
       */
      if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
S
slguan 已提交
1111 1112 1113
        goto _clean;
      }

1114 1115 1116 1117 1118
      /*
       * if no data has been generated during parsing the sql string, error msg will return
       * Otherwise, create the first submit block and submit to virtual node.
       */
      if (totalNum == 0) {
1119
        code = TSDB_CODE_TSC_INVALID_SQL;
1120
        goto _clean;
1121 1122
      } else {
        break;
H
hzcheng 已提交
1123 1124 1125
      }
    }

1126
    pCmd->curSql = sToken.z;
H
Haojun Liao 已提交
1127
    char      buf[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
1128
    SStrToken sTblToken;
B
Bomin Zhang 已提交
1129
    sTblToken.z = buf;
S
slguan 已提交
1130
    // Check if the table name available or not
H
Hui Li 已提交
1131
    if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1132
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
1133
      goto _clean;
H
huili 已提交
1134 1135
    }

H
Hui Li 已提交
1136
    if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
1137
      goto _clean;
H
hzcheng 已提交
1138 1139
    }

H
Haojun Liao 已提交
1140
    char *bindedColumns = NULL;
H
Haojun Liao 已提交
1141
    if ((code = tscCheckIfCreateTable(&str, pSql, &bindedColumns)) != TSDB_CODE_SUCCESS) {
1142
      /*
H
Haojun Liao 已提交
1143 1144
       * After retrieving the table meta from server, the sql string will be parsed from the paused position.
       * And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
1145
       */
1146
      if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
1147
        return code;
H
hzcheng 已提交
1148
      }
H
Haojun Liao 已提交
1149

D
dapan1121 已提交
1150
      tscError("0x%"PRIx64" async insert parse error, code:%s", pSql->self, tstrerror(code));
1151
      pCmd->curSql = NULL;
1152
      goto _clean;
H
hzcheng 已提交
1153 1154
    }

weixin_48148422's avatar
weixin_48148422 已提交
1155
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
1156
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
1157
      goto _clean;
H
hzcheng 已提交
1158 1159
    }

S
slguan 已提交
1160
    index = 0;
H
Haojun Liao 已提交
1161
    sToken = tStrGetToken(str, &index, false);
S
slguan 已提交
1162
    str += index;
1163

H
Haojun Liao 已提交
1164
    if (sToken.n == 0 || (sToken.type != TK_FILE && sToken.type != TK_VALUES)) {
1165
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
1166
      goto _clean;
H
hzcheng 已提交
1167 1168
    }

H
Haojun Liao 已提交
1169 1170
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
    if (sToken.type == TK_FILE) {
1171
      if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
1172
        goto _clean;
H
hzcheng 已提交
1173 1174
      }

S
slguan 已提交
1175
      index = 0;
H
Haojun Liao 已提交
1176
      sToken = tStrGetToken(str, &index, false);
1177 1178
      if (sToken.type != TK_STRING && sToken.type != TK_ID) {
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
1179
        goto _clean;
1180
      }
S
slguan 已提交
1181 1182
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1183
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
1184
        goto _clean;
H
hzcheng 已提交
1185 1186
      }

H
Haojun Liao 已提交
1187 1188
      strncpy(pCmd->payload, sToken.z, sToken.n);
      strdequote(pCmd->payload);
1189

H
Haojun Liao 已提交
1190
      // todo refactor extract method
H
hzcheng 已提交
1191
      wordexp_t full_path;
H
Haojun Liao 已提交
1192
      if (wordexp(pCmd->payload, &full_path, 0) != 0) {
H
hjxilinx 已提交
1193
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
1194
        goto _clean;
H
hzcheng 已提交
1195 1196
      }

H
Haojun Liao 已提交
1197 1198
      tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
      wordfree(&full_path);
1199

H
Haojun Liao 已提交
1200 1201 1202
    } else {
      if (bindedColumns == NULL) {
        STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
S
slguan 已提交
1203

H
Haojun Liao 已提交
1204 1205
        if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
          goto _clean;
S
slguan 已提交
1206 1207
        }

H
Haojun Liao 已提交
1208 1209 1210 1211 1212 1213
        STableDataBlocks *dataBuf = NULL;
        int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
                                              sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
                                              &dataBuf, NULL);
        if (ret != TSDB_CODE_SUCCESS) {
          goto _clean;
H
hzcheng 已提交
1214 1215
        }

H
Haojun Liao 已提交
1216 1217 1218 1219 1220 1221 1222
        code = doParseInsertStatement(pCmd, &str, dataBuf, &totalNum);
        if (code != TSDB_CODE_SUCCESS) {
          goto _clean;
        }
      } else {  // bindedColumns != NULL
        // insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
        STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
H
hzcheng 已提交
1223

H
Haojun Liao 已提交
1224 1225
        if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
          goto _clean;
H
hzcheng 已提交
1226 1227
        }

H
Haojun Liao 已提交
1228 1229 1230 1231 1232
        STableDataBlocks *dataBuf = NULL;
        int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
                                              sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
                                              &dataBuf, NULL);
        if (ret != TSDB_CODE_SUCCESS) {
1233
          goto _clean;
H
hzcheng 已提交
1234 1235
        }

H
Haojun Liao 已提交
1236 1237 1238 1239 1240
        SSchema *pSchema = tscGetTableSchema(pTableMeta);
        code = parseBoundColumns(pCmd, &dataBuf->boundColumnInfo, pSchema, bindedColumns, NULL);
        if (code != TSDB_CODE_SUCCESS) {
          goto _clean;
        }
H
hzcheng 已提交
1241

H
Haojun Liao 已提交
1242 1243 1244 1245
        if (dataBuf->boundColumnInfo.cols[0].hasVal == false) {
          code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", NULL);
          goto _clean;
        }
S
slguan 已提交
1246

H
Haojun Liao 已提交
1247 1248 1249 1250
        if (sToken.type != TK_VALUES) {
          code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
          goto _clean;
        }
H
hzcheng 已提交
1251

H
Haojun Liao 已提交
1252 1253 1254 1255
        code = doParseInsertStatement(pCmd, &str, dataBuf, &totalNum);
        if (code != TSDB_CODE_SUCCESS) {
          goto _clean;
        }
H
hzcheng 已提交
1256 1257 1258 1259
      }
    }
  }

S
slguan 已提交
1260 1261 1262 1263
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }
1264

H
Haojun Liao 已提交
1265
  if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
H
Haojun Liao 已提交
1266
    if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
1267
      goto _clean;
S
slguan 已提交
1268
    }
H
hzcheng 已提交
1269 1270 1271 1272 1273 1274
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

_clean:
H
Haojun Liao 已提交
1275
  pCmd->curSql = NULL;
1276
  pCmd->parseFinished  = 1;
H
hzcheng 已提交
1277 1278 1279
  return code;
}

H
Haojun Liao 已提交
1280
int tsInsertInitialCheck(SSqlObj *pSql) {
S
slguan 已提交
1281
  if (!pSql->pTscObj->writeAuth) {
1282
    return TSDB_CODE_TSC_NO_WRITE_AUTH;
S
slguan 已提交
1283
  }
H
hzcheng 已提交
1284

H
hjxilinx 已提交
1285
  int32_t  index = 0;
S
slguan 已提交
1286
  SSqlCmd *pCmd = &pSql->cmd;
1287

H
Haojun Liao 已提交
1288
  SStrToken sToken = tStrGetToken(pSql->sqlstr, &index, false);
H
hjxilinx 已提交
1289
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
1290 1291 1292 1293

  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;

H
Haojun Liao 已提交
1294
  SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
1295

H
Haojun Liao 已提交
1296
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
1297

H
Haojun Liao 已提交
1298
  sToken = tStrGetToken(pSql->sqlstr, &index, false);
S
slguan 已提交
1299
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1300
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1301
  }
1302

H
Haojun Liao 已提交
1303 1304
  pCmd->curSql = sToken.z + sToken.n;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1305 1306
}

H
Haojun Liao 已提交
1307
int tsParseSql(SSqlObj *pSql, bool initial) {
H
hzcheng 已提交
1308
  int32_t ret = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1309
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1310

H
Haojun Liao 已提交
1311
  if ((!pCmd->parseFinished) && (!initial)) {
H
Haojun Liao 已提交
1312
    tscDebug("0x%"PRIx64" resume to parse sql: %s", pSql->self, pCmd->curSql);
H
[TD-98]  
hjxilinx 已提交
1313
  }
1314

B
Bomin Zhang 已提交
1315 1316 1317 1318 1319
  ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }

H
hjxilinx 已提交
1320
  if (tscIsInsertData(pSql->sqlstr)) {
B
Bomin Zhang 已提交
1321 1322 1323 1324 1325 1326 1327
    if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
      return ret;
    }

    // make a backup as tsParseInsertSql may modify the string
    char* sqlstr = strdup(pSql->sqlstr);
    ret = tsParseInsertSql(pSql);
1328 1329
    if ((sqlstr == NULL) || (pSql->parseRetry >= 1) ||
        (ret != TSDB_CODE_TSC_SQL_SYNTAX_ERROR && ret != TSDB_CODE_TSC_INVALID_SQL)) {
B
Bomin Zhang 已提交
1330 1331
      free(sqlstr);
    } else {
1332
      tscResetSqlCmd(pCmd, true);
B
Bomin Zhang 已提交
1333 1334
      free(pSql->sqlstr);
      pSql->sqlstr = sqlstr;
1335
      pSql->parseRetry++;
B
Bomin Zhang 已提交
1336 1337 1338
      if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
        ret = tsParseInsertSql(pSql);
      }
H
Haojun Liao 已提交
1339
    }
H
hzcheng 已提交
1340
  } else {
H
Haojun Liao 已提交
1341
    SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr);
H
hzcheng 已提交
1342
    ret = tscToSQLCmd(pSql, &SQLInfo);
1343
    if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
1344
      tscResetSqlCmd(pCmd, true);
1345
      pSql->parseRetry++;
B
Bomin Zhang 已提交
1346 1347
      ret = tscToSQLCmd(pSql, &SQLInfo);
    }
1348

H
Haojun Liao 已提交
1349
    SqlInfoDestroy(&SQLInfo);
H
hzcheng 已提交
1350 1351 1352
  }

  /*
1353
   * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
H
Haojun Liao 已提交
1354 1355 1356
   * so do NOT use pRes->code to determine if the getTableMeta function
   * invokes new threads to get data from mgmt node or simply retrieves data from cache.
   * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
H
hzcheng 已提交
1357 1358 1359 1360
   */
  return ret;
}

S
slguan 已提交
1361 1362 1363
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1364
  pSql->res.numOfRows = 0;
S
slguan 已提交
1365

1366
  assert(pCmd->numOfClause == 1);
1367
  STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
S
slguan 已提交
1368

1369
  SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
H
Haojun Liao 已提交
1370 1371
  code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1372
    return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
H
Haojun Liao 已提交
1373
  }
S
slguan 已提交
1374

H
Haojun Liao 已提交
1375
  if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1376 1377
    return code;
  }
S
slguan 已提交
1378

1379
  STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
S
slguan 已提交
1380 1381 1382 1383
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

1384
  return tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
1385 1386 1387 1388 1389 1390 1391
}

typedef struct SImportFileSupport {
  SSqlObj *pSql;
  FILE    *fp;
} SImportFileSupport;

H
Haojun Liao 已提交
1392
static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRows) {
H
Haojun Liao 已提交
1393 1394
  assert(param != NULL && tres != NULL);

1395 1396 1397 1398 1399 1400 1401 1402
  char *  tokenBuf = NULL;
  size_t  n = 0;
  ssize_t readLen = 0;
  char *  line = NULL;
  int32_t count = 0;
  int32_t maxRows = 0;
  FILE *  fp   = NULL;

H
Haojun Liao 已提交
1403 1404 1405
  SSqlObj *pSql = tres;
  SSqlCmd *pCmd = &pSql->cmd;

1406
  SImportFileSupport *pSupporter = (SImportFileSupport *)param;
H
Haojun Liao 已提交
1407 1408

  SSqlObj *pParentSql = pSupporter->pSql;
1409
  fp = pSupporter->fp;
H
Haojun Liao 已提交
1410

H
Haojun Liao 已提交
1411
  int32_t code = pSql->res.code;
H
Haojun Liao 已提交
1412

H
Haojun Liao 已提交
1413 1414 1415 1416 1417
  // retry parse data from file and import data from the begining again
  if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
    assert(pSql->res.numOfRows == 0);
    int32_t ret = fseek(fp, 0, SEEK_SET);
    if (ret < 0) {
D
dapan1121 已提交
1418
      tscError("0x%"PRIx64" failed to seek SEEK_SET since:%s", pSql->self, tstrerror(errno));
H
Haojun Liao 已提交
1419
      code = TAOS_SYSTEM_ERROR(errno);
1420 1421
      goto _error;
    }
H
Haojun Liao 已提交
1422 1423
  } else if (code != TSDB_CODE_SUCCESS) {
    goto _error;
S
slguan 已提交
1424 1425
  }

H
Haojun Liao 已提交
1426 1427 1428 1429 1430 1431 1432
  // accumulate the total submit records
  pParentSql->res.numOfRows += pSql->res.numOfRows;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
  STableComInfo   tinfo = tscGetTableInfo(pTableMeta);

H
Haojun Liao 已提交
1433
  destroyTableNameList(pCmd);
H
Haojun Liao 已提交
1434

1435 1436 1437 1438
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

  if (pCmd->pTableBlockHashList == NULL) {
    pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
1439
    if (pCmd->pTableBlockHashList == NULL) {
H
Haojun Liao 已提交
1440
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1441 1442
      goto _error;
    }
1443
  }
H
Haojun Liao 已提交
1444

H
Haojun Liao 已提交
1445
  STableDataBlocks *pTableDataBlock = NULL;
1446 1447
  int32_t           ret =
      tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
1448
                              tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL);
H
Haojun Liao 已提交
1449
  if (ret != TSDB_CODE_SUCCESS) {
1450 1451
    pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
1452 1453
  }

H
Haojun Liao 已提交
1454
  tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
1455 1456
  tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
  if (tokenBuf == NULL) {
H
Haojun Liao 已提交
1457
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1458 1459
    goto _error;
  }
H
Haojun Liao 已提交
1460

S
TD-1848  
Shengliang Guan 已提交
1461
  while ((readLen = tgetline(&line, &n, fp)) != -1) {
H
Haojun Liao 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
      line[--readLen] = 0;
    }

    if (readLen == 0) {
      continue;
    }

    char *lineptr = line;
    strtolower(line, line);

H
Haojun Liao 已提交
1473
    int32_t len = 0;
H
Haojun Liao 已提交
1474
    code = tsParseOneRow(&lineptr, pTableDataBlock, pCmd, tinfo.precision, &len, tokenBuf);
H
Haojun Liao 已提交
1475
    if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
H
Haojun Liao 已提交
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486
      pSql->res.code = code;
      break;
    }

    pTableDataBlock->size += len;

    if (++count >= maxRows) {
      break;
    }
  }

S
TD-1848  
Shengliang Guan 已提交
1487
  tfree(tokenBuf);
1488
  tfree(line);
H
Haojun Liao 已提交
1489

1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502
  pParentSql->res.code = code;
  if (code == TSDB_CODE_SUCCESS) {
    if (count > 0) {
      code = doPackSendDataBlock(pSql, count, pTableDataBlock);
      if (code == TSDB_CODE_SUCCESS) {
        return;
      } else {
        goto _error;
      }
    } else {
      taos_free_result(pSql);
      tfree(pSupporter);
      fclose(fp);
H
Haojun Liao 已提交
1503

1504 1505 1506
      pParentSql->fp = pParentSql->fetchFp;

      // all data has been sent to vnode, call user function
H
Haojun Liao 已提交
1507
      int32_t v = (code != TSDB_CODE_SUCCESS) ? code : (int32_t)pParentSql->res.numOfRows;
1508
      (*pParentSql->fp)(pParentSql->param, pParentSql, v);
H
Haojun Liao 已提交
1509
      return;
H
Haojun Liao 已提交
1510
    }
1511
  }
H
Haojun Liao 已提交
1512

1513 1514 1515 1516 1517 1518
_error:
  tfree(tokenBuf);
  tfree(line);
  taos_free_result(pSql);
  tfree(pSupporter);
  fclose(fp);
H
Haojun Liao 已提交
1519

1520
  tscAsyncResultOnError(pParentSql);
S
slguan 已提交
1521 1522
}

H
Haojun Liao 已提交
1523
void tscImportDataFromFile(SSqlObj *pSql) {
S
slguan 已提交
1524
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1525 1526 1527 1528
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

H
Haojun Liao 已提交
1529
  assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE  && strlen(pCmd->payload) != 0);
1530
  pCmd->active = pCmd->pQueryInfo[0];
H
Haojun Liao 已提交
1531

H
Haojun Liao 已提交
1532
  SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
H
Haojun Liao 已提交
1533
  SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
H
Haojun Liao 已提交
1534
  pCmd->count = 1;
H
Haojun Liao 已提交
1535

S
TD-1207  
Shengliang Guan 已提交
1536
  FILE *fp = fopen(pCmd->payload, "rb");
H
Haojun Liao 已提交
1537 1538
  if (fp == NULL) {
    pSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
dapan1121 已提交
1539
    tscError("0x%"PRIx64" failed to open file %s to load data from file, code:%s", pSql->self, pCmd->payload, tstrerror(pSql->res.code));
H
hzcheng 已提交
1540

H
Haojun Liao 已提交
1541
    tfree(pSupporter);
Y
TD-2453  
yihaoDeng 已提交
1542
    taos_free_result(pNew);
H
Haojun Liao 已提交
1543
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
1544 1545
    return;
  }
S
slguan 已提交
1546

H
Haojun Liao 已提交
1547
  pSupporter->pSql = pSql;
H
Haojun Liao 已提交
1548
  pSupporter->fp   = fp;
1549

H
Haojun Liao 已提交
1550
  parseFileSendDataBlock(pSupporter, pNew, TSDB_CODE_SUCCESS);
H
hzcheng 已提交
1551
}