tscParseInsert.c 45.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

21
#include "os.h"
22 23

#include "hash.h"
H
hzcheng 已提交
24 25 26
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
27
#include "ttokendef.h"
H
hzcheng 已提交
28
#include "taosdef.h"
H
hzcheng 已提交
29

S
slguan 已提交
30
#include "tscLog.h"
H
hjxilinx 已提交
31
#include "tscSubquery.h"
H
hzcheng 已提交
32 33
#include "tstoken.h"

S
slguan 已提交
34
#include "tdataformat.h"
35

S
slguan 已提交
36
enum {
S
slguan 已提交
37 38 39 40
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

L
lihui 已提交
41
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
H
hzcheng 已提交
42

S
slguan 已提交
43
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
44 45 46 47
  if (pToken->n == 0) {
    return TK_ILLEGAL;
  }
  
B
Bomin Zhang 已提交
48 49 50 51 52 53 54 55

  int32_t radix = 10;
  if (pToken->type == TK_HEX) {
    radix = 16;
  } else if (pToken->type == TK_BIN) {
    radix = 2;
  }
  
L
lihui 已提交
56
  errno = 0;
B
Bomin Zhang 已提交
57
  *value = strtoll(pToken->z, endPtr, radix);
B
Bomin Zhang 已提交
58 59 60 61 62 63
  if (**endPtr == 'e' || **endPtr == 'E' || **endPtr == '.') {
    errno = 0;
    double v = round(strtod(pToken->z, endPtr));
    if (v > INT64_MAX || v <= INT64_MIN) {
      errno = ERANGE;
    } else {
S
TD-1057  
Shengliang Guan 已提交
64
      *value = (int64_t)v;
B
Bomin Zhang 已提交
65 66
    }
  }
67 68
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
69
  if (*endPtr - pToken->z != pToken->n) {
70 71
    return TK_ILLEGAL;
  }
S
slguan 已提交
72

H
Haojun Liao 已提交
73
  return pToken->type;
H
hzcheng 已提交
74 75
}

S
slguan 已提交
76
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
77 78 79 80
  if (pToken->n == 0) {
    return TK_ILLEGAL;
  }
  
L
lihui 已提交
81
  errno = 0;
S
slguan 已提交
82
  *value = strtod(pToken->z, endPtr);
83 84
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
85
  if ((*endPtr - pToken->z) != pToken->n) {
86 87
    return TK_ILLEGAL;
  }
B
Bomin Zhang 已提交
88 89

  return pToken->type;
S
slguan 已提交
90
}
H
hzcheng 已提交
91

S
slguan 已提交
92 93 94 95 96 97
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
  int32_t   index = 0;
  SSQLToken sToken;
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
98

S
slguan 已提交
99
  index = 0;
H
hzcheng 已提交
100

S
slguan 已提交
101
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
102
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
103
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
104
    // do nothing
S
slguan 已提交
105
  } else if (pToken->type == TK_INTEGER) {
S
Shengliang Guan 已提交
106
    useconds = tsosStr2int64(pToken->z);
H
hzcheng 已提交
107 108
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
dengyihao's avatar
dengyihao 已提交
109
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
110
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
111 112 113 114 115
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
116 117 118 119
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
120 121 122 123 124 125 126 127
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
128 129 130
   * time expression:
   * e.g., now+12a, now-5h
   */
S
slguan 已提交
131 132 133 134
  SSQLToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
  pTokenEnd += index;
135

S
slguan 已提交
136 137 138 139
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
    pTokenEnd += index;
140

S
slguan 已提交
141
    if (valueToken.n < 2) {
H
hjxilinx 已提交
142
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
143 144
    }

S
slguan 已提交
145
    if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
146
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
147
    }
148

H
hzcheng 已提交
149 150 151 152
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
153
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
154 155 156 157 158 159 160 161 162 163 164 165
      useconds += interval;
    } else {
      useconds = (useconds >= interval) ? useconds - interval : 0;
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
166
int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
S
slguan 已提交
167 168 169
                             int16_t timePrec) {
  int64_t iv;
  int32_t numType;
S
slguan 已提交
170
  char *  endptr = NULL;
171 172
  errno = 0;  // clear the previous existed error information

H
hzcheng 已提交
173 174
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
S
slguan 已提交
175 176
      if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
S
slguan 已提交
177
          *(uint8_t *)payload = TSDB_TRUE;
S
slguan 已提交
178
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
S
slguan 已提交
179
          *(uint8_t *)payload = TSDB_FALSE;
S
slguan 已提交
180 181
        } else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
          *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
H
hzcheng 已提交
182
        } else {
H
hjxilinx 已提交
183
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
184
        }
S
slguan 已提交
185 186 187 188 189 190 191 192 193
      } else if (pToken->type == TK_INTEGER) {
        iv = strtoll(pToken->z, NULL, 10);
        *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_FLOAT) {
        double dv = strtod(pToken->z, NULL);
        *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_NULL) {
        *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
      } else {
H
hjxilinx 已提交
194
        return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
195 196 197 198
      }
      break;
    }
    case TSDB_DATA_TYPE_TINYINT:
S
slguan 已提交
199 200 201 202 203
      if (pToken->type == TK_NULL) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
204
      } else {
S
slguan 已提交
205
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
206
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
207
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
L
lihui 已提交
208
        } else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
H
hjxilinx 已提交
209
          return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
H
hzcheng 已提交
210 211
        }

212
        *((int8_t *)payload) = (int8_t)iv;
H
hzcheng 已提交
213 214 215 216 217
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
S
slguan 已提交
218 219 220 221 222
      if (pToken->type == TK_NULL) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
223
      } else {
S
slguan 已提交
224
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
225
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
226
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
L
lihui 已提交
227
        } else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
H
hjxilinx 已提交
228
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
229 230
        }

S
slguan 已提交
231
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
232 233 234 235
      }
      break;

    case TSDB_DATA_TYPE_INT:
S
slguan 已提交
236 237 238 239
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
240
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
241
      } else {
S
slguan 已提交
242
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
243
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
244
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
L
lihui 已提交
245
        } else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
H
hjxilinx 已提交
246
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
247 248
        }

S
slguan 已提交
249
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
250 251 252 253 254
      }

      break;

    case TSDB_DATA_TYPE_BIGINT:
S
slguan 已提交
255 256 257 258
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
259
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
260
      } else {
S
slguan 已提交
261
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
262
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
263
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
B
Bomin Zhang 已提交
264
        } else if (errno == ERANGE || iv == INT64_MIN) {
H
hjxilinx 已提交
265
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
266
        }
S
slguan 已提交
267 268

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
269 270 271 272
      }
      break;

    case TSDB_DATA_TYPE_FLOAT:
S
slguan 已提交
273 274 275 276
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
277
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
278
      } else {
S
slguan 已提交
279 280
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
281
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
282 283 284 285
        }

        float fv = (float)dv;
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
H
hjxilinx 已提交
286
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
287 288
        }

S
slguan 已提交
289 290
        if (isinf(fv) || isnan(fv)) {
          *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
291
        }
S
slguan 已提交
292 293

        *((float *)payload) = fv;
H
hzcheng 已提交
294 295 296 297
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
S
slguan 已提交
298 299 300 301
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
302
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
303
      } else {
S
slguan 已提交
304 305
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
306
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
307 308
        }

S
slguan 已提交
309
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
H
hjxilinx 已提交
310
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
311 312 313 314 315 316
        }

        if (isinf(dv) || isnan(dv)) {
          *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
        } else {
          *((double *)payload) = dv;
H
hzcheng 已提交
317 318 319 320 321
        }
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
322 323
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
324
        setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
H
hjxilinx 已提交
325
      } else { // too long values will return invalid sql, not be truncated automatically
H
hjxilinx 已提交
326
        if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
H
hjxilinx 已提交
327
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
328
        }
H
hjxilinx 已提交
329
        
330
        STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
H
hzcheng 已提交
331 332 333 334 335
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
336
      if (pToken->type == TK_NULL) {
337
        setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
338
      } else {
H
hjxilinx 已提交
339
        // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
340 341
        size_t output = 0;
        if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
H
hjxilinx 已提交
342 343
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), "%s", strerror(errno));
H
hjxilinx 已提交
344
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
345
        }
346
        
347
        varDataSetLen(payload, output);
H
hzcheng 已提交
348 349 350 351
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
352
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
353
        if (primaryKey) {
S
slguan 已提交
354
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
355
        } else {
S
slguan 已提交
356
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
357 358
        }
      } else {
S
slguan 已提交
359 360
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
361
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
362
        }
H
hjxilinx 已提交
363
        
S
slguan 已提交
364
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
365 366 367 368 369 370
      }

      break;
    }
  }

H
hjxilinx 已提交
371
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
372 373
}

S
slguan 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == 0) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
394
      return -1;  // client time/server time can not be mixed
395

S
slguan 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
409
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
410 411
                      int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
  int32_t index = 0;
H
hjxilinx 已提交
412
  SSQLToken sToken = {0};
S
slguan 已提交
413
  char *    payload = pDataBlocks->pData + pDataBlocks->size;
S
slguan 已提交
414

S
slguan 已提交
415
  // 1. set the parsed value from sql string
H
hzcheng 已提交
416
  int32_t rowSize = 0;
417
  for (int i = 0; i < spd->numOfAssignedCols; ++i) {
S
slguan 已提交
418
    // the start position in data block buffer of current value in sql
419 420
    char *   start = payload + spd->elems[i].offset;
    int16_t  colIndex = spd->elems[i].colIndex;
S
slguan 已提交
421
    SSchema *pSchema = schema + colIndex;
S
slguan 已提交
422
    rowSize += pSchema->bytes;
H
hzcheng 已提交
423

S
slguan 已提交
424 425 426 427 428
    index = 0;
    sToken = tStrGetToken(*str, &index, true, 0, NULL);
    *str += index;

    if (sToken.type == TK_QUESTION) {
S
TD-1057  
Shengliang Guan 已提交
429
      uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
S
slguan 已提交
430 431 432
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
433

S
slguan 已提交
434
      strcpy(error, "client out of memory");
435
      *code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
436 437 438
      return -1;
    }

439 440 441
    int16_t type = sToken.type;
    if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
         type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
H
hjxilinx 已提交
442
      tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z);
443
      *code = TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
444
      return -1;
H
hzcheng 已提交
445 446
    }

S
slguan 已提交
447 448
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
449
      // delete escape character: \\, \', \"
450
      char    delim = sToken.z[0];
L
[1292]  
lihui 已提交
451 452
      int32_t cnt = 0;
      int32_t j = 0;
S
TD-1057  
Shengliang Guan 已提交
453
      for (uint32_t k = 1; k < sToken.n - 1; ++k) {
F
fang 已提交
454 455
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
456
            cnt++;
L
lihui 已提交
457 458 459
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
460 461 462
            continue;
          }
        }
463

L
[NONE]  
lihui 已提交
464
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
465 466
        j++;
      }
467
      tmpTokenBuf[j] = 0;
L
[1292]  
lihui 已提交
468
      sToken.z = tmpTokenBuf;
469
      sToken.n -= 2 + cnt;
H
hzcheng 已提交
470 471
    }

S
slguan 已提交
472 473
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
    int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
S
slguan 已提交
474
    if (ret != TSDB_CODE_SUCCESS) {
475
      *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
476 477
      return -1;  // NOTE: here 0 mean error!
    }
478

S
slguan 已提交
479
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
480
      tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
481
      *code = TSDB_CODE_TSC_INVALID_TIME_STAMP;
S
slguan 已提交
482
      return -1;
483
    }
H
hzcheng 已提交
484 485
  }

S
slguan 已提交
486
  // 2. set the null value for the columns that do not assign values
487
  if (spd->numOfAssignedCols < spd->numOfCols) {
S
slguan 已提交
488
    char *ptr = payload;
H
hzcheng 已提交
489 490

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
491
      
492
      if (!spd->hasVal[i]) {  // current column do not have any value to insert, set it to null
493 494 495 496 497 498 499 500 501
        if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
          varDataSetLen(ptr, sizeof(int8_t));
          *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
        } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
          varDataSetLen(ptr, sizeof(int32_t));
          *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
        } else {
          setNull(ptr, schema[i].type, schema[i].bytes);
        }
H
hzcheng 已提交
502
      }
503
      
H
hzcheng 已提交
504 505 506
      ptr += schema[i].bytes;
    }

S
TD-1057  
Shengliang Guan 已提交
507
    rowSize = (int32_t)(ptr - payload);
H
hzcheng 已提交
508 509 510 511 512
  }

  return rowSize;
}

S
slguan 已提交
513 514 515 516 517 518 519 520 521
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
522 523
}

H
hjxilinx 已提交
524
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows,
525
                  SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
S
slguan 已提交
526 527
  int32_t   index = 0;
  SSQLToken sToken;
H
hzcheng 已提交
528 529 530

  int16_t numOfRows = 0;

H
hjxilinx 已提交
531
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hjxilinx 已提交
532
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
533 534
  
  int32_t  precision = tinfo.precision;
S
slguan 已提交
535 536

  if (spd->hasVal[0] == false) {
S
slguan 已提交
537
    strcpy(error, "primary timestamp column can not be null");
538
    *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
539 540 541 542
    return -1;
  }

  while (1) {
S
slguan 已提交
543 544 545
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
546

S
slguan 已提交
547
    *str += index;
H
hjxilinx 已提交
548
    if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
L
lihui 已提交
549
      int32_t tSize;
H
hjxilinx 已提交
550
      int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
L
lihui 已提交
551
      if (retcode != TSDB_CODE_SUCCESS) {  //TODO pass the correct error code to client
S
slguan 已提交
552
        strcpy(error, "client out of memory");
L
lihui 已提交
553
        *code = retcode;
S
slguan 已提交
554 555
        return -1;
      }
L
lihui 已提交
556 557
      ASSERT(tSize > maxRows);
      maxRows = tSize;
H
hzcheng 已提交
558 559
    }

L
[1292]  
lihui 已提交
560
    int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
561
    if (len <= 0) {  // error message has been set in tsParseOneRowData
H
hzcheng 已提交
562 563 564 565 566
      return -1;
    }

    pDataBlock->size += len;

S
slguan 已提交
567 568 569 570
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
hjxilinx 已提交
571
      tscInvalidSQLErrMsg(error, ") expected", *str);
572
      *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
573 574 575 576 577 578 579 580
      return -1;
    }

    numOfRows++;
  }

  if (numOfRows <= 0) {
    strcpy(error, "no any data points");
581
    *code = TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
582 583 584
    return -1;
  } else {
    return numOfRows;
H
hzcheng 已提交
585 586 587
  }
}

S
slguan 已提交
588
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) {
H
hzcheng 已提交
589
  spd->numOfCols = numOfCols;
590
  spd->numOfAssignedCols = numOfCols;
H
hzcheng 已提交
591 592 593 594 595 596 597 598 599 600 601

  for (int32_t i = 0; i < numOfCols; ++i) {
    spd->hasVal[i] = true;
    spd->elems[i].colIndex = i;

    if (i > 0) {
      spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes;
    }
  }
}

L
lihui 已提交
602
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
S
slguan 已提交
603
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
604
  const int factor = 5;
S
slguan 已提交
605
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
L
[#1102]  
lihui 已提交
606
  
H
hzcheng 已提交
607
  // expand the allocated size
S
slguan 已提交
608 609
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
610
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
611 612
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
613

S
slguan 已提交
614 615 616 617 618
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
H
hjxilinx 已提交
619
      // do nothing, if allocate more memory failed
S
slguan 已提交
620
      pDataBlock->nAllocSize = nAllocSizeOld;
L
[#1102]  
lihui 已提交
621
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
622
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
623
    }
H
hzcheng 已提交
624 625
  }

L
[#1102]  
lihui 已提交
626
  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
627
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
628 629
}

630
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
631 632
  pBlocks->tid = pTableMeta->id.tid;
  pBlocks->uid = pTableMeta->id.uid;
H
hjxilinx 已提交
633
  pBlocks->sversion = pTableMeta->sversion;
S
slguan 已提交
634
  pBlocks->numOfRows += numOfRows;
H
hzcheng 已提交
635 636
}

S
slguan 已提交
637
// data block is disordered, sort it in ascending order
H
hjxilinx 已提交
638
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
639
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
S
slguan 已提交
640 641

  // size is less than the total size, since duplicated rows may be removed yet.
642
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
S
slguan 已提交
643

S
slguan 已提交
644 645 646 647 648
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
649
  if (!dataBuf->ordered) {
650
    char *pBlockData = pBlocks->data;
S
slguan 已提交
651
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
652

S
slguan 已提交
653 654
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
655

S
slguan 已提交
656
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
657 658
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
659

S
slguan 已提交
660 661 662 663
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
664

S
slguan 已提交
665 666 667 668 669 670 671 672 673
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
674

S
slguan 已提交
675
    pBlocks->numOfRows = i + 1;
676
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
677
  }
S
slguan 已提交
678 679
}

680
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd,
S
slguan 已提交
681
                                      int32_t *totalNum) {
S
slguan 已提交
682
  SSqlCmd *       pCmd = &pSql->cmd;
683
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
684
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
685
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
686
  
H
hjxilinx 已提交
687
  STableDataBlocks *dataBuf = NULL;
688
  int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
689
                                        sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
H
hjxilinx 已提交
690
                                        pTableMeta, &dataBuf);
H
hjxilinx 已提交
691 692 693 694
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  
L
lihui 已提交
695
  int32_t maxNumOfRows;
H
hjxilinx 已提交
696
  ret = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
L
lihui 已提交
697
  if (TSDB_CODE_SUCCESS != ret) {
698
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
699
  }
700

701
  int32_t code = TSDB_CODE_TSC_INVALID_SQL;
702
  char *  tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
L
[1292]  
lihui 已提交
703
  if (NULL == tmpTokenBuf) {
704
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
L
[1292]  
lihui 已提交
705
  }
L
lihui 已提交
706

H
hjxilinx 已提交
707
  int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
L
[1292]  
lihui 已提交
708
  free(tmpTokenBuf);
H
hzcheng 已提交
709
  if (numOfRows <= 0) {
L
[1292]  
lihui 已提交
710
    return code;
H
hzcheng 已提交
711 712
  }

S
slguan 已提交
713
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
714
    SParamInfo *param = dataBuf->params + i;
S
slguan 已提交
715 716
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
717
      param->offset -= sizeof(SSubmitBlk);
S
slguan 已提交
718 719 720
    }
  }

721
  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
H
hjxilinx 已提交
722
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
723

H
hjxilinx 已提交
724
  dataBuf->vgId = pTableMeta->vgroupInfo.vgId;
S
slguan 已提交
725
  dataBuf->numOfTables = 1;
H
hzcheng 已提交
726 727

  /*
S
slguan 已提交
728 729
   * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
   * which is actually returned from server.
H
hzcheng 已提交
730
   */
S
slguan 已提交
731
  *totalNum += numOfRows;
H
hzcheng 已提交
732 733 734
  return TSDB_CODE_SUCCESS;
}

735
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
S
slguan 已提交
736
  int32_t   index = 0;
737 738
  SSQLToken sToken = {0};
  SSQLToken tableToken = {0};
S
slguan 已提交
739
  int32_t   code = TSDB_CODE_SUCCESS;
740 741 742 743 744 745
  
  const int32_t TABLE_INDEX = 0;
  const int32_t STABLE_INDEX = 1;
  
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
746

S
slguan 已提交
747
  char *sql = *sqlstr;
748

S
slguan 已提交
749 750 751 752
  // get the token of specified table
  index = 0;
  tableToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;
H
hzcheng 已提交
753

S
slguan 已提交
754 755
  char *cstart = NULL;
  char *cend = NULL;
H
hzcheng 已提交
756

S
slguan 已提交
757 758 759 760 761
  // skip possibly exists column list
  index = 0;
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;

H
hzcheng 已提交
762
  int32_t numOfColList = 0;
S
slguan 已提交
763
  bool    createTable = false;
H
hzcheng 已提交
764

S
slguan 已提交
765 766 767
  if (sToken.type == TK_LP) {
    cstart = &sToken.z[0];
    index = 0;
H
hzcheng 已提交
768
    while (1) {
S
slguan 已提交
769 770 771
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      if (sToken.type == TK_RP) {
        cend = &sToken.z[0];
H
hzcheng 已提交
772 773 774 775 776 777
        break;
      }

      ++numOfColList;
    }

S
slguan 已提交
778 779
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
H
hzcheng 已提交
780 781 782
  }

  if (numOfColList == 0 && cstart != NULL) {
783
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
784
  }
785
  
H
hjxilinx 已提交
786
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
787 788
  
  if (sToken.type == TK_USING) {  // create table if not exists according to the super table
S
slguan 已提交
789 790 791 792
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;

H
Haojun Liao 已提交
793 794 795
    tscAllocPayload(pCmd, sizeof(STagData));
    STagData *pTag = (STagData *) pCmd->payload;

S
slguan 已提交
796
    memset(pTag, 0, sizeof(STagData));
797
    
H
Haojun Liao 已提交
798
    //the source super table is moved to the secondary position of the pTableMetaInfo list
799
    if (pQueryInfo->numOfTables < 2) {
H
hjxilinx 已提交
800
      tscAddEmptyMetaInfo(pQueryInfo);
801
    }
H
hzcheng 已提交
802

H
hjxilinx 已提交
803
    STableMetaInfo *pSTableMeterMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
B
Bomin Zhang 已提交
804 805 806 807
    code = tscSetTableFullName(pSTableMeterMetaInfo, &sToken, pSql);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
808

B
Bomin Zhang 已提交
809
    tstrncpy(pTag->name, pSTableMeterMetaInfo->name, sizeof(pTag->name));
H
hjxilinx 已提交
810
    code = tscGetTableMeta(pSql, pSTableMeterMetaInfo);
H
hzcheng 已提交
811 812 813 814
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

weixin_48148422's avatar
weixin_48148422 已提交
815
    if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMeterMetaInfo)) {
H
hjxilinx 已提交
816
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
817 818
    }

H
hjxilinx 已提交
819
    SSchema *pTagSchema = tscGetTableTagSchema(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
820
    STableComInfo tinfo = tscGetTableInfo(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
821
    
S
slguan 已提交
822 823 824
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
L
lihui 已提交
825

826
    SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
827 828
    
    uint8_t numOfTags = tscGetNumOfTags(pSTableMeterMetaInfo->pTableMeta);
L
lihui 已提交
829 830 831 832 833 834
    spd.numOfCols = numOfTags;

    // if specify some tags column
    if (sToken.type != TK_LP) {
      tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
    } else {
835 836
      /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
       * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
L
lihui 已提交
837 838 839 840 841 842 843 844 845 846 847
      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < numOfTags; ++t) {
        offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
      }

      while (1) {
        index = 0;
        sToken = tStrGetToken(sql, &index, false, 0, NULL);
        sql += index;

        if (TK_STRING == sToken.type) {
H
Haojun Liao 已提交
848
          strdequote(sToken.z);
S
TD-1057  
Shengliang Guan 已提交
849
          sToken.n = (uint32_t)strtrim(sToken.z);
L
lihui 已提交
850 851 852 853 854 855 856 857 858 859 860 861
        }

        if (sToken.type == TK_RP) {
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < numOfTags; ++t) {
          if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) {
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
862
            pElem->offset = offset[t];
L
lihui 已提交
863 864 865 866 867 868
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
              return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z);
            }

869
            spd.hasVal[t] = true;
L
lihui 已提交
870 871 872 873 874 875 876 877 878 879 880 881 882
            findColumnIndex = true;
            break;
          }
        }

        if (!findColumnIndex) {
          return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken.z);
        }
      }

      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > numOfTags) {
        return tscInvalidSQLErrMsg(pCmd->payload, "tag name expected", sToken.z);
      }
L
lihui 已提交
883 884 885 886

      index = 0;
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      sql += index;
L
lihui 已提交
887
    }
888

S
slguan 已提交
889
    if (sToken.type != TK_TAGS) {
L
lihui 已提交
890
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
H
hzcheng 已提交
891 892
    }

B
Bomin Zhang 已提交
893 894 895 896 897
    SKVRowBuilder kvRowBuilder = {0};
    if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }

S
slguan 已提交
898 899
    uint32_t ignoreTokenTypes = TK_LP;
    uint32_t numOfIgnoreToken = 1;
L
lihui 已提交
900
    for (int i = 0; i < spd.numOfAssignedCols; ++i) {
B
Bomin Zhang 已提交
901
      SSchema* pSchema = pTagSchema + spd.elems[i].colIndex;
902

S
slguan 已提交
903 904 905
      index = 0;
      sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
      sql += index;
H
Haojun Liao 已提交
906

B
Bomin Zhang 已提交
907 908 909 910 911
      if (TK_ILLEGAL == sToken.type) {
        tdDestroyKVRowBuilder(&kvRowBuilder);
        return TSDB_CODE_TSC_INVALID_SQL;
      }

H
Haojun Liao 已提交
912
      if (sToken.n == 0 || sToken.type == TK_RP) {
S
slguan 已提交
913 914
        break;
      }
H
hzcheng 已提交
915

S
slguan 已提交
916 917 918 919
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
920 921
      }

B
Bomin Zhang 已提交
922 923
      char tagVal[TSDB_MAX_TAGS_LEN];
      code = tsParseOneColumnData(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
H
hzcheng 已提交
924
      if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
925
        tdDestroyKVRowBuilder(&kvRowBuilder);
H
hjxilinx 已提交
926
        return code;
H
hzcheng 已提交
927
      }
B
Bomin Zhang 已提交
928 929

      tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
L
lihui 已提交
930
    }
S
slguan 已提交
931

B
Bomin Zhang 已提交
932
    SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
B
Bomin Zhang 已提交
933 934 935 936 937
    tdDestroyKVRowBuilder(&kvRowBuilder);
    if (row == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    tdSortKVRowByColIdx(row);
B
Bomin Zhang 已提交
938
    pTag->dataLen = kvRowLen(row);
B
Bomin Zhang 已提交
939
    kvRowCpy(pTag->data, row);
B
Bomin Zhang 已提交
940 941
    free(row);

L
lihui 已提交
942 943 944 945 946
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
      return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
947 948
    }

B
Bomin Zhang 已提交
949
    pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen;
950
    pTag->dataLen = htonl(pTag->dataLen);
951

H
hzcheng 已提交
952
    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
953
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
954 955
    }

H
Haojun Liao 已提交
956
    int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
H
hzcheng 已提交
957 958 959 960 961
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

    createTable = true;
H
hjxilinx 已提交
962
    code = tscGetMeterMetaEx(pSql, pTableMetaInfo, true);
963
    if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
964 965 966
      return code;
    }
    
H
hzcheng 已提交
967 968 969 970
  } else {
    if (cstart != NULL) {
      sql = cstart;
    } else {
S
slguan 已提交
971
      sql = sToken.z;
H
hzcheng 已提交
972
    }
B
Bomin Zhang 已提交
973
    code = tscGetMeterMetaEx(pSql, pTableMetaInfo, false);
H
hjxilinx 已提交
974
    
975
    if (pCmd->curSql == NULL) {
976
      assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
H
hjxilinx 已提交
977
    }
H
hzcheng 已提交
978 979
  }

S
TD-1057  
Shengliang Guan 已提交
980
  int32_t len = (int32_t)(cend - cstart + 1);
H
hzcheng 已提交
981 982
  if (cstart != NULL && createTable == true) {
    /* move the column list to start position of the next accessed points */
W
WangXin 已提交
983
    memmove(sql - len, cstart, len);
H
hzcheng 已提交
984 985 986 987 988
    *sqlstr = sql - len;
  } else {
    *sqlstr = sql;
  }

989
  if (*sqlstr == NULL) {
990
    code = TSDB_CODE_TSC_INVALID_SQL;
991 992
  }
  
H
hzcheng 已提交
993 994 995
  return code;
}

H
Hui Li 已提交
996
int validateTableName(char *tblName, int len, SSQLToken* psTblToken) {
H
Haojun Liao 已提交
997
  tstrncpy(psTblToken->z, tblName, TSDB_TABLE_FNAME_LEN);
S
slguan 已提交
998

H
Hui Li 已提交
999 1000
  psTblToken->n    = len;
  psTblToken->type = TK_ID;
B
Bomin Zhang 已提交
1001
  tSQLGetToken(psTblToken->z, &psTblToken->type);
S
slguan 已提交
1002

H
Hui Li 已提交
1003
  return tscValidateName(psTblToken);
H
huili 已提交
1004 1005
}

1006 1007 1008 1009 1010 1011 1012 1013 1014
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
  if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
  }

  pCmd->dataSourceType = type;
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023
/**
 * usage: insert into table1 values() () table2 values()()
 *
 * @param str
 * @param acct
 * @param db
 * @param pSql
 * @return
 */
H
Haojun Liao 已提交
1024
int tsParseInsertSql(SSqlObj *pSql) {
S
slguan 已提交
1025
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1026
  char* str = pCmd->curSql;
1027

S
slguan 已提交
1028
  int32_t totalNum = 0;
1029 1030 1031 1032 1033
  int32_t code = TSDB_CODE_SUCCESS;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo != NULL);

H
Haojun Liao 已提交
1034
  STableMetaInfo *pTableMetaInfo = NULL;
1035
  if (pQueryInfo->numOfTables == 0) {
H
hjxilinx 已提交
1036
    pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
1037
  } else {
H
hjxilinx 已提交
1038
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1039
  }
H
hzcheng 已提交
1040

H
Haojun Liao 已提交
1041 1042 1043
  if ((code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    return code;
  }
H
hzcheng 已提交
1044

H
Haojun Liao 已提交
1045
  if (NULL == pCmd->pTableList) {
H
Haojun Liao 已提交
1046
    pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
1047
    pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
1048
    if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
1049
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1050
      goto _error;
L
lihui 已提交
1051 1052
    }
  } else {
1053
    str = pCmd->curSql;
L
lihui 已提交
1054 1055
  }
  
1056
  tscDebug("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList);
H
hzcheng 已提交
1057 1058

  while (1) {
1059
    int32_t   index = 0;
S
slguan 已提交
1060
    SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL);
1061 1062 1063 1064 1065 1066 1067 1068

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      /*
       * if the data is from the data file, no data has been generated yet. So, there no data to
       * merge or submit, save the file path and parse the file in other routines.
       */
      if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
S
slguan 已提交
1069 1070 1071
        goto _clean;
      }

1072 1073 1074 1075 1076
      /*
       * if no data has been generated during parsing the sql string, error msg will return
       * Otherwise, create the first submit block and submit to virtual node.
       */
      if (totalNum == 0) {
1077
        code = TSDB_CODE_TSC_INVALID_SQL;
H
Haojun Liao 已提交
1078
        goto _error;
1079 1080
      } else {
        break;
H
hzcheng 已提交
1081 1082 1083
      }
    }

1084
    pCmd->curSql = sToken.z;
H
Haojun Liao 已提交
1085
    char buf[TSDB_TABLE_FNAME_LEN];
H
Hui Li 已提交
1086
    SSQLToken sTblToken;
B
Bomin Zhang 已提交
1087
    sTblToken.z = buf;
S
slguan 已提交
1088
    // Check if the table name available or not
H
Hui Li 已提交
1089
    if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1090
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
H
Haojun Liao 已提交
1091
      goto _error;
H
huili 已提交
1092 1093
    }

H
Hui Li 已提交
1094
    if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1095
      goto _error;
H
hzcheng 已提交
1096 1097
    }

1098 1099
    if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
      /*
H
Haojun Liao 已提交
1100 1101
       * After retrieving the table meta from server, the sql string will be parsed from the paused position.
       * And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
1102
       */
1103
      if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
1104
        return code;
H
hzcheng 已提交
1105
      }
H
hjxilinx 已提交
1106
      
H
Haojun Liao 已提交
1107
      tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
1108
      pCmd->curSql = NULL;
H
Haojun Liao 已提交
1109
      goto _error;
H
hzcheng 已提交
1110 1111
    }

weixin_48148422's avatar
weixin_48148422 已提交
1112
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
1113
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
H
Haojun Liao 已提交
1114
      goto _error;
H
hzcheng 已提交
1115 1116
    }

S
slguan 已提交
1117 1118 1119
    index = 0;
    sToken = tStrGetToken(str, &index, false, 0, NULL);
    str += index;
1120

S
slguan 已提交
1121
    if (sToken.n == 0) {
1122
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
H
Haojun Liao 已提交
1123
      goto _error;
H
hzcheng 已提交
1124
    }
H
hjxilinx 已提交
1125
    
H
hjxilinx 已提交
1126
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1127
    
S
slguan 已提交
1128
    if (sToken.type == TK_VALUES) {
H
hjxilinx 已提交
1129
      SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
H
hjxilinx 已提交
1130 1131
      
      SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1132
      tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
H
hzcheng 已提交
1133

1134
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1135
        goto _error;
H
hzcheng 已提交
1136 1137 1138 1139 1140 1141
      }

      /*
       * app here insert data in different vnodes, so we need to set the following
       * data in another submit procedure using async insert routines
       */
1142
      code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
H
hzcheng 已提交
1143
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1144
        goto _error;
H
hzcheng 已提交
1145
      }
S
slguan 已提交
1146
    } else if (sToken.type == TK_FILE) {
1147
      if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1148
        goto _error;
H
hzcheng 已提交
1149 1150
      }

S
slguan 已提交
1151 1152 1153 1154
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1155
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
H
Haojun Liao 已提交
1156
        goto _error;
H
hzcheng 已提交
1157 1158
      }

H
Haojun Liao 已提交
1159 1160
      strncpy(pCmd->payload, sToken.z, sToken.n);
      strdequote(pCmd->payload);
1161

H
Haojun Liao 已提交
1162
      // todo refactor extract method
H
hzcheng 已提交
1163
      wordexp_t full_path;
H
Haojun Liao 已提交
1164
      if (wordexp(pCmd->payload, &full_path, 0) != 0) {
H
hjxilinx 已提交
1165
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
H
Haojun Liao 已提交
1166
        goto _error;
H
hzcheng 已提交
1167 1168
      }

H
Haojun Liao 已提交
1169 1170
      tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
      wordfree(&full_path);
1171

S
slguan 已提交
1172
    } else if (sToken.type == TK_LP) {
H
hzcheng 已提交
1173
      /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
1174
      STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
H
hjxilinx 已提交
1175
      SSchema *   pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1176

1177
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1178
        goto _error;
H
hzcheng 已提交
1179 1180
      }

1181
      SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
1182
      spd.numOfCols = tinfo.numOfColumns;
H
hzcheng 已提交
1183 1184

      int16_t offset[TSDB_MAX_COLUMNS] = {0};
H
hjxilinx 已提交
1185
      for (int32_t t = 1; t < tinfo.numOfColumns; ++t) {
H
hzcheng 已提交
1186 1187 1188 1189
        offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
      }

      while (1) {
S
slguan 已提交
1190 1191 1192 1193 1194
        index = 0;
        sToken = tStrGetToken(str, &index, false, 0, NULL);
        str += index;

        if (TK_STRING == sToken.type) {
H
Haojun Liao 已提交
1195
          strdequote(sToken.z);
S
TD-1057  
Shengliang Guan 已提交
1196
          sToken.n = (uint32_t)strtrim(sToken.z);
S
slguan 已提交
1197 1198 1199
        }

        if (sToken.type == TK_RP) {
H
hzcheng 已提交
1200 1201 1202 1203 1204 1205
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
H
hjxilinx 已提交
1206
        for (int32_t t = 0; t < tinfo.numOfColumns; ++t) {
S
slguan 已提交
1207
          if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
S
slguan 已提交
1208
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
H
hzcheng 已提交
1209 1210 1211 1212
            pElem->offset = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
H
hjxilinx 已提交
1213
              code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
H
Haojun Liao 已提交
1214
              goto _error;
H
hzcheng 已提交
1215 1216 1217 1218 1219 1220 1221 1222
            }

            spd.hasVal[t] = true;
            findColumnIndex = true;
            break;
          }
        }

S
slguan 已提交
1223
        if (!findColumnIndex) {
H
hjxilinx 已提交
1224
          code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
H
Haojun Liao 已提交
1225
          goto _error;
H
hzcheng 已提交
1226 1227 1228
        }
      }

H
hjxilinx 已提交
1229
      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
H
hjxilinx 已提交
1230
        code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
H
Haojun Liao 已提交
1231
        goto _error;
H
hzcheng 已提交
1232 1233
      }

S
slguan 已提交
1234 1235 1236 1237 1238
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;

      if (sToken.type != TK_VALUES) {
H
hjxilinx 已提交
1239
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
H
Haojun Liao 已提交
1240
        goto _error;
H
hzcheng 已提交
1241 1242
      }

1243
      code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
H
hzcheng 已提交
1244
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1245
        goto _error;
H
hzcheng 已提交
1246 1247
      }
    } else {
H
hjxilinx 已提交
1248
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
Haojun Liao 已提交
1249
      goto _error;
H
hzcheng 已提交
1250 1251 1252
    }
  }

S
slguan 已提交
1253 1254 1255 1256
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }
1257

1258
  if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId
S
slguan 已提交
1259
    if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1260
      goto _error;
S
slguan 已提交
1261
    }
H
hzcheng 已提交
1262 1263 1264 1265 1266
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

H
Haojun Liao 已提交
1267
_error:
S
slguan 已提交
1268
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1269 1270

_clean:
1271 1272
  taosHashCleanup(pCmd->pTableList);
  pCmd->pTableList = NULL;
H
hjxilinx 已提交
1273
  
1274 1275
  pCmd->curSql    = NULL;
  pCmd->parseFinished  = 1;
H
hjxilinx 已提交
1276
  
H
hzcheng 已提交
1277 1278 1279
  return code;
}

H
Haojun Liao 已提交
1280
int tsInsertInitialCheck(SSqlObj *pSql) {
S
slguan 已提交
1281
  if (!pSql->pTscObj->writeAuth) {
1282
    return TSDB_CODE_TSC_NO_WRITE_AUTH;
S
slguan 已提交
1283
  }
H
hzcheng 已提交
1284

H
hjxilinx 已提交
1285
  int32_t  index = 0;
S
slguan 已提交
1286
  SSqlCmd *pCmd = &pSql->cmd;
1287 1288

  SSQLToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
H
hjxilinx 已提交
1289
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
1290 1291 1292

  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;
1293
  pSql->res.numOfRows = 0;
1294 1295

  SQueryInfo *pQueryInfo = NULL;
1296
  tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
1297

H
Haojun Liao 已提交
1298
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
1299 1300

  sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
S
slguan 已提交
1301
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1302
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1303
  }
1304

H
Haojun Liao 已提交
1305 1306
  pCmd->curSql = sToken.z + sToken.n;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1307 1308
}

H
Haojun Liao 已提交
1309
int tsParseSql(SSqlObj *pSql, bool initial) {
H
hzcheng 已提交
1310
  int32_t ret = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1311
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1312

H
Haojun Liao 已提交
1313
  if ((!pCmd->parseFinished) && (!initial)) {
1314
    tscDebug("%p resume to parse sql: %s", pSql, pCmd->curSql);
H
[TD-98]  
hjxilinx 已提交
1315
  }
L
lihui 已提交
1316
  
B
Bomin Zhang 已提交
1317 1318 1319 1320 1321
  ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }

H
hjxilinx 已提交
1322
  if (tscIsInsertData(pSql->sqlstr)) {
H
hzcheng 已提交
1323
    /*
1324 1325
     * Set the fp before parse the sql string, in case of getTableMeta failed, in which
     * the error handle callback function can rightfully restore the user-defined callback function (fp).
H
hzcheng 已提交
1326
     */
H
Haojun Liao 已提交
1327
    if (initial && (pSql->cmd.insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
H
[TD-98]  
hjxilinx 已提交
1328
      pSql->fetchFp = pSql->fp;
H
hjxilinx 已提交
1329
      pSql->fp = (void(*)())tscHandleMultivnodeInsert;
H
hzcheng 已提交
1330
    }
H
Haojun Liao 已提交
1331
    
H
Haojun Liao 已提交
1332
    if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
H
Haojun Liao 已提交
1333 1334 1335
      return ret;
    }
    
1336
    ret = tsParseInsertSql(pSql);
H
hzcheng 已提交
1337
  } else {
1338
    SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
H
hzcheng 已提交
1339 1340 1341 1342 1343
    ret = tscToSQLCmd(pSql, &SQLInfo);
    SQLInfoDestroy(&SQLInfo);
  }

  /*
1344
   * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
H
Haojun Liao 已提交
1345 1346 1347
   * so do NOT use pRes->code to determine if the getTableMeta function
   * invokes new threads to get data from mgmt node or simply retrieves data from cache.
   * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
H
hzcheng 已提交
1348 1349 1350 1351
   */
  return ret;
}

S
slguan 已提交
1352 1353 1354
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1355
  pSql->res.numOfRows = 0;
S
slguan 已提交
1356

1357
  assert(pCmd->numOfClause == 1);
1358
  STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
S
slguan 已提交
1359

1360
  SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
H
hjxilinx 已提交
1361
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
1362

S
slguan 已提交
1363 1364 1365
  if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
    return code;
  }
S
slguan 已提交
1366

1367
  STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
S
slguan 已提交
1368 1369 1370 1371
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394
  return tscProcessSql(pSql);
}

typedef struct SImportFileSupport {
  SSqlObj *pSql;
  FILE    *fp;
} SImportFileSupport;

static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
  assert(param != NULL && tres != NULL);

  SSqlObj *pSql = tres;
  SSqlCmd *pCmd = &pSql->cmd;

  SImportFileSupport *pSupporter = (SImportFileSupport *) param;

  SSqlObj *pParentSql = pSupporter->pSql;
  FILE    *fp = pSupporter->fp;

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {  // handle error
    assert(taos_errno(pSql) == code);

    taos_free_result(pSql);
S
Shengliang Guan 已提交
1395
    taosTFree(pSupporter);
H
Haojun Liao 已提交
1396 1397 1398
    fclose(fp);

    pParentSql->res.code = code;
H
Haojun Liao 已提交
1399
    tscQueueAsyncRes(pParentSql);
H
Haojun Liao 已提交
1400
    return;
S
slguan 已提交
1401 1402
  }

H
Haojun Liao 已提交
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419
  // accumulate the total submit records
  pParentSql->res.numOfRows += pSql->res.numOfRows;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
  SSchema *       pSchema = tscGetTableSchema(pTableMeta);
  STableComInfo   tinfo = tscGetTableInfo(pTableMeta);

  SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
  tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);

  size_t  n = 0;
  ssize_t readLen = 0;
  char *  line = NULL;
  int32_t count = 0;
  int32_t maxRows = 0;

H
Haojun Liao 已提交
1420 1421
  tscDestroyBlockArrayList(pSql->cmd.pDataBlocks);
  pCmd->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
1422

H
Haojun Liao 已提交
1423 1424 1425 1426 1427 1428 1429
  STableDataBlocks *pTableDataBlock = NULL;
  int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
  if (ret != TSDB_CODE_SUCCESS) {
//    return ret;
  }

  taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock);
H
Haojun Liao 已提交
1430
  tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
H
Haojun Liao 已提交
1431

H
Haojun Liao 已提交
1432 1433
  char *tokenBuf = calloc(1, 4096);

S
TD-1057  
Shengliang Guan 已提交
1434
  while ((readLen = taosGetline(&line, &n, fp)) != -1) {
H
Haojun Liao 已提交
1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
      line[--readLen] = 0;
    }

    if (readLen == 0) {
      continue;
    }

    char *lineptr = line;
    strtolower(line, line);

    int32_t len =
        tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf);
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
      pSql->res.code = code;
      break;
    }

    pTableDataBlock->size += len;

    if (++count >= maxRows) {
      break;
    }
  }

S
Shengliang Guan 已提交
1460
  taosTFree(tokenBuf);
H
Haojun Liao 已提交
1461 1462 1463
  free(line);

  if (count > 0) {
H
Haojun Liao 已提交
1464 1465
    code = doPackSendDataBlock(pSql, count, pTableDataBlock);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1466
      pParentSql->res.code = code;
H
Haojun Liao 已提交
1467 1468
      tscQueueAsyncRes(pParentSql);
      return;
H
Haojun Liao 已提交
1469 1470 1471 1472
    }

  } else {
    taos_free_result(pSql);
S
Shengliang Guan 已提交
1473
    taosTFree(pSupporter);
H
Haojun Liao 已提交
1474 1475 1476 1477 1478
    fclose(fp);

    pParentSql->fp = pParentSql->fetchFp;

    // all data has been sent to vnode, call user function
S
TD-1057  
Shengliang Guan 已提交
1479
    int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : (int32_t)pParentSql->res.numOfRows;
H
Haojun Liao 已提交
1480 1481
    (*pParentSql->fp)(pParentSql->param, pParentSql, v);
  }
S
slguan 已提交
1482 1483
}

H
Haojun Liao 已提交
1484
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
S
slguan 已提交
1485
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1486 1487 1488 1489
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

H
Haojun Liao 已提交
1490
  assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE  && strlen(pCmd->payload) != 0);
H
Haojun Liao 已提交
1491

H
Haojun Liao 已提交
1492
  SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
H
Haojun Liao 已提交
1493 1494 1495
  SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);

  pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES);
H
Haojun Liao 已提交
1496
  pCmd->count = 1;
H
Haojun Liao 已提交
1497

H
Haojun Liao 已提交
1498 1499 1500 1501
  FILE *fp = fopen(pCmd->payload, "r");
  if (fp == NULL) {
    pSql->res.code = TAOS_SYSTEM_ERROR(errno);
    tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
H
hzcheng 已提交
1502

S
Shengliang Guan 已提交
1503
    taosTFree(pSupporter)
H
Haojun Liao 已提交
1504
    tscQueueAsyncRes(pSql);
S
slguan 已提交
1505

H
Haojun Liao 已提交
1506 1507
    return;
  }
S
slguan 已提交
1508

H
Haojun Liao 已提交
1509 1510
  pSupporter->pSql = pSql;
  pSupporter->fp = fp;
1511

H
Haojun Liao 已提交
1512
  parseFileSendDataBlock(pSupporter, pNew, 0);
H
hzcheng 已提交
1513
}