tscParseInsert.c 45.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

21
#include "os.h"
22 23

#include "hash.h"
H
hzcheng 已提交
24 25 26
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
27
#include "ttokendef.h"
H
hzcheng 已提交
28
#include "taosdef.h"
H
hzcheng 已提交
29

S
slguan 已提交
30
#include "tscLog.h"
H
hjxilinx 已提交
31
#include "tscSubquery.h"
H
hzcheng 已提交
32 33 34
#include "tstoken.h"
#include "ttime.h"

S
slguan 已提交
35
#include "tdataformat.h"
36

S
slguan 已提交
37
enum {
S
slguan 已提交
38 39 40 41
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

L
lihui 已提交
42
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
H
hzcheng 已提交
43

S
slguan 已提交
44
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
45 46 47 48
  if (pToken->n == 0) {
    return TK_ILLEGAL;
  }
  
B
Bomin Zhang 已提交
49 50 51 52 53 54 55 56

  int32_t radix = 10;
  if (pToken->type == TK_HEX) {
    radix = 16;
  } else if (pToken->type == TK_BIN) {
    radix = 2;
  }
  
L
lihui 已提交
57
  errno = 0;
B
Bomin Zhang 已提交
58
  *value = strtoll(pToken->z, endPtr, radix);
B
Bomin Zhang 已提交
59 60 61 62 63 64 65 66 67
  if (**endPtr == 'e' || **endPtr == 'E' || **endPtr == '.') {
    errno = 0;
    double v = round(strtod(pToken->z, endPtr));
    if (v > INT64_MAX || v <= INT64_MIN) {
      errno = ERANGE;
    } else {
      *value = v;
    }
  }
68 69
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
70
  if (*endPtr - pToken->z != pToken->n) {
71 72
    return TK_ILLEGAL;
  }
S
slguan 已提交
73

H
Haojun Liao 已提交
74
  return pToken->type;
H
hzcheng 已提交
75 76
}

S
slguan 已提交
77
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
78 79 80 81
  if (pToken->n == 0) {
    return TK_ILLEGAL;
  }
  
L
lihui 已提交
82
  errno = 0;
S
slguan 已提交
83
  *value = strtod(pToken->z, endPtr);
84 85
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
86
  if ((*endPtr - pToken->z) != pToken->n) {
87 88
    return TK_ILLEGAL;
  }
B
Bomin Zhang 已提交
89 90

  return pToken->type;
S
slguan 已提交
91
}
H
hzcheng 已提交
92

S
slguan 已提交
93 94 95 96 97 98
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
  int32_t   index = 0;
  SSQLToken sToken;
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
99

S
slguan 已提交
100
  index = 0;
H
hzcheng 已提交
101

S
slguan 已提交
102
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
103
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
104
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
105
    // do nothing
S
slguan 已提交
106 107
  } else if (pToken->type == TK_INTEGER) {
    useconds = str2int64(pToken->z);
H
hzcheng 已提交
108 109
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
dengyihao's avatar
dengyihao 已提交
110
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
111
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
112 113 114 115 116
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
117 118 119 120
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
121 122 123 124 125 126 127 128
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
129 130 131
   * time expression:
   * e.g., now+12a, now-5h
   */
S
slguan 已提交
132 133 134 135
  SSQLToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
  pTokenEnd += index;
136

S
slguan 已提交
137 138 139 140
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
    pTokenEnd += index;
141

S
slguan 已提交
142
    if (valueToken.n < 2) {
H
hjxilinx 已提交
143
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
144 145
    }

S
slguan 已提交
146
    if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
147
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
148
    }
149

H
hzcheng 已提交
150 151 152 153
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
154
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
155 156 157 158 159 160 161 162 163 164 165 166
      useconds += interval;
    } else {
      useconds = (useconds >= interval) ? useconds - interval : 0;
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
167
int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
S
slguan 已提交
168 169 170
                             int16_t timePrec) {
  int64_t iv;
  int32_t numType;
S
slguan 已提交
171
  char *  endptr = NULL;
172 173
  errno = 0;  // clear the previous existed error information

H
hzcheng 已提交
174 175
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
S
slguan 已提交
176 177
      if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
S
slguan 已提交
178
          *(uint8_t *)payload = TSDB_TRUE;
S
slguan 已提交
179
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
S
slguan 已提交
180
          *(uint8_t *)payload = TSDB_FALSE;
S
slguan 已提交
181 182
        } else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
          *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
H
hzcheng 已提交
183
        } else {
H
hjxilinx 已提交
184
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
185
        }
S
slguan 已提交
186 187 188 189 190 191 192 193 194
      } else if (pToken->type == TK_INTEGER) {
        iv = strtoll(pToken->z, NULL, 10);
        *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_FLOAT) {
        double dv = strtod(pToken->z, NULL);
        *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_NULL) {
        *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
      } else {
H
hjxilinx 已提交
195
        return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
196 197 198 199
      }
      break;
    }
    case TSDB_DATA_TYPE_TINYINT:
S
slguan 已提交
200 201 202 203 204
      if (pToken->type == TK_NULL) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
205
      } else {
S
slguan 已提交
206
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
207
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
208
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
L
lihui 已提交
209
        } else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
H
hjxilinx 已提交
210
          return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
H
hzcheng 已提交
211 212
        }

213
        *((int8_t *)payload) = (int8_t)iv;
H
hzcheng 已提交
214 215 216 217 218
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
S
slguan 已提交
219 220 221 222 223
      if (pToken->type == TK_NULL) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
224
      } else {
S
slguan 已提交
225
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
226
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
227
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
L
lihui 已提交
228
        } else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
H
hjxilinx 已提交
229
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
230 231
        }

S
slguan 已提交
232
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
233 234 235 236
      }
      break;

    case TSDB_DATA_TYPE_INT:
S
slguan 已提交
237 238 239 240
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
241
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
242
      } else {
S
slguan 已提交
243
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
244
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
245
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
L
lihui 已提交
246
        } else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
H
hjxilinx 已提交
247
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
248 249
        }

S
slguan 已提交
250
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
251 252 253 254 255
      }

      break;

    case TSDB_DATA_TYPE_BIGINT:
S
slguan 已提交
256 257 258 259
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
260
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
261
      } else {
S
slguan 已提交
262
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
263
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
264
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
B
Bomin Zhang 已提交
265
        } else if (errno == ERANGE || iv == INT64_MIN) {
H
hjxilinx 已提交
266
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
267
        }
S
slguan 已提交
268 269

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
270 271 272 273
      }
      break;

    case TSDB_DATA_TYPE_FLOAT:
S
slguan 已提交
274 275 276 277
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
278
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
279
      } else {
S
slguan 已提交
280 281
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
282
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
283 284 285 286
        }

        float fv = (float)dv;
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
H
hjxilinx 已提交
287
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
288 289
        }

S
slguan 已提交
290 291
        if (isinf(fv) || isnan(fv)) {
          *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
292
        }
S
slguan 已提交
293 294

        *((float *)payload) = fv;
H
hzcheng 已提交
295 296 297 298
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
S
slguan 已提交
299 300 301 302
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
303
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
304
      } else {
S
slguan 已提交
305 306
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
307
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
308 309
        }

S
slguan 已提交
310
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
H
hjxilinx 已提交
311
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
312 313 314 315 316 317
        }

        if (isinf(dv) || isnan(dv)) {
          *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
        } else {
          *((double *)payload) = dv;
H
hzcheng 已提交
318 319 320 321 322
        }
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
323 324
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
325
        setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
H
hjxilinx 已提交
326
      } else { // too long values will return invalid sql, not be truncated automatically
H
hjxilinx 已提交
327
        if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
H
hjxilinx 已提交
328
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
329
        }
H
hjxilinx 已提交
330
        
331
        STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
H
hzcheng 已提交
332 333 334 335 336
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
337
      if (pToken->type == TK_NULL) {
338
        setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
339
      } else {
H
hjxilinx 已提交
340
        // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
341 342
        size_t output = 0;
        if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
H
hjxilinx 已提交
343 344
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), "%s", strerror(errno));
H
hjxilinx 已提交
345
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
346
        }
347
        
348
        varDataSetLen(payload, output);
H
hzcheng 已提交
349 350 351 352
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
353
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
354
        if (primaryKey) {
S
slguan 已提交
355
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
356
        } else {
S
slguan 已提交
357
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
358 359
        }
      } else {
S
slguan 已提交
360 361
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
362
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
363
        }
H
hjxilinx 已提交
364
        
S
slguan 已提交
365
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
366 367 368 369 370 371
      }

      break;
    }
  }

H
hjxilinx 已提交
372
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
373 374
}

S
slguan 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == 0) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
395
      return -1;  // client time/server time can not be mixed
396

S
slguan 已提交
397 398 399 400 401 402 403 404 405 406 407 408 409
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
410
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
411 412
                      int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
  int32_t index = 0;
H
hjxilinx 已提交
413
  SSQLToken sToken = {0};
S
slguan 已提交
414
  char *    payload = pDataBlocks->pData + pDataBlocks->size;
S
slguan 已提交
415

S
slguan 已提交
416
  // 1. set the parsed value from sql string
H
hzcheng 已提交
417
  int32_t rowSize = 0;
418
  for (int i = 0; i < spd->numOfAssignedCols; ++i) {
S
slguan 已提交
419
    // the start position in data block buffer of current value in sql
420 421
    char *   start = payload + spd->elems[i].offset;
    int16_t  colIndex = spd->elems[i].colIndex;
S
slguan 已提交
422
    SSchema *pSchema = schema + colIndex;
S
slguan 已提交
423
    rowSize += pSchema->bytes;
H
hzcheng 已提交
424

S
slguan 已提交
425 426 427 428 429 430 431 432 433
    index = 0;
    sToken = tStrGetToken(*str, &index, true, 0, NULL);
    *str += index;

    if (sToken.type == TK_QUESTION) {
      uint32_t offset = start - pDataBlocks->pData;
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
434

S
slguan 已提交
435
      strcpy(error, "client out of memory");
436
      *code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
437 438 439
      return -1;
    }

440 441 442
    int16_t type = sToken.type;
    if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
         type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
H
hjxilinx 已提交
443
      tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z);
444
      *code = TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
445
      return -1;
H
hzcheng 已提交
446 447
    }

S
slguan 已提交
448 449
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
450
      // delete escape character: \\, \', \"
451
      char    delim = sToken.z[0];
L
[1292]  
lihui 已提交
452 453
      int32_t cnt = 0;
      int32_t j = 0;
454
      for (int32_t k = 1; k < sToken.n - 1; ++k) {
F
fang 已提交
455 456
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
457
            cnt++;
L
lihui 已提交
458 459 460
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
461 462 463
            continue;
          }
        }
464

L
[NONE]  
lihui 已提交
465
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
466 467
        j++;
      }
468
      tmpTokenBuf[j] = 0;
L
[1292]  
lihui 已提交
469
      sToken.z = tmpTokenBuf;
470
      sToken.n -= 2 + cnt;
H
hzcheng 已提交
471 472
    }

S
slguan 已提交
473 474
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
    int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
S
slguan 已提交
475
    if (ret != TSDB_CODE_SUCCESS) {
476
      *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
477 478
      return -1;  // NOTE: here 0 mean error!
    }
479

S
slguan 已提交
480
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
481
      tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
482
      *code = TSDB_CODE_TSC_INVALID_TIME_STAMP;
S
slguan 已提交
483
      return -1;
484
    }
H
hzcheng 已提交
485 486
  }

S
slguan 已提交
487
  // 2. set the null value for the columns that do not assign values
488
  if (spd->numOfAssignedCols < spd->numOfCols) {
S
slguan 已提交
489
    char *ptr = payload;
H
hzcheng 已提交
490 491

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
492
      
493
      if (!spd->hasVal[i]) {  // current column do not have any value to insert, set it to null
494 495 496 497 498 499 500 501 502
        if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
          varDataSetLen(ptr, sizeof(int8_t));
          *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
        } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
          varDataSetLen(ptr, sizeof(int32_t));
          *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
        } else {
          setNull(ptr, schema[i].type, schema[i].bytes);
        }
H
hzcheng 已提交
503
      }
504
      
H
hzcheng 已提交
505 506 507 508 509 510 511 512 513
      ptr += schema[i].bytes;
    }

    rowSize = ptr - payload;
  }

  return rowSize;
}

S
slguan 已提交
514 515 516 517 518 519 520 521 522
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
523 524
}

H
hjxilinx 已提交
525
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows,
526
                  SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
S
slguan 已提交
527 528
  int32_t   index = 0;
  SSQLToken sToken;
H
hzcheng 已提交
529 530 531

  int16_t numOfRows = 0;

H
hjxilinx 已提交
532
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hjxilinx 已提交
533
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
534 535
  
  int32_t  precision = tinfo.precision;
S
slguan 已提交
536 537

  if (spd->hasVal[0] == false) {
S
slguan 已提交
538
    strcpy(error, "primary timestamp column can not be null");
539
    *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
540 541 542 543
    return -1;
  }

  while (1) {
S
slguan 已提交
544 545 546
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
547

S
slguan 已提交
548
    *str += index;
H
hjxilinx 已提交
549
    if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
L
lihui 已提交
550
      int32_t tSize;
H
hjxilinx 已提交
551
      int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
L
lihui 已提交
552
      if (retcode != TSDB_CODE_SUCCESS) {  //TODO pass the correct error code to client
S
slguan 已提交
553
        strcpy(error, "client out of memory");
L
lihui 已提交
554
        *code = retcode;
S
slguan 已提交
555 556
        return -1;
      }
L
lihui 已提交
557 558
      ASSERT(tSize > maxRows);
      maxRows = tSize;
H
hzcheng 已提交
559 560
    }

L
[1292]  
lihui 已提交
561
    int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
562
    if (len <= 0) {  // error message has been set in tsParseOneRowData
H
hzcheng 已提交
563 564 565 566 567
      return -1;
    }

    pDataBlock->size += len;

S
slguan 已提交
568 569 570 571
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
hjxilinx 已提交
572
      tscInvalidSQLErrMsg(error, ") expected", *str);
573
      *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
574 575 576 577 578 579 580 581
      return -1;
    }

    numOfRows++;
  }

  if (numOfRows <= 0) {
    strcpy(error, "no any data points");
582
    *code = TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
583 584 585
    return -1;
  } else {
    return numOfRows;
H
hzcheng 已提交
586 587 588
  }
}

S
slguan 已提交
589
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) {
H
hzcheng 已提交
590
  spd->numOfCols = numOfCols;
591
  spd->numOfAssignedCols = numOfCols;
H
hzcheng 已提交
592 593 594 595 596 597 598 599 600 601 602

  for (int32_t i = 0; i < numOfCols; ++i) {
    spd->hasVal[i] = true;
    spd->elems[i].colIndex = i;

    if (i > 0) {
      spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes;
    }
  }
}

L
lihui 已提交
603
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
S
slguan 已提交
604
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
605
  const int factor = 5;
S
slguan 已提交
606
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
L
[#1102]  
lihui 已提交
607
  
H
hzcheng 已提交
608
  // expand the allocated size
S
slguan 已提交
609 610
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
611
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
612 613
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
614

S
slguan 已提交
615 616 617 618 619
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
H
hjxilinx 已提交
620
      // do nothing, if allocate more memory failed
S
slguan 已提交
621
      pDataBlock->nAllocSize = nAllocSizeOld;
L
[#1102]  
lihui 已提交
622
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
623
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
624
    }
H
hzcheng 已提交
625 626
  }

L
[#1102]  
lihui 已提交
627
  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
628
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
629 630
}

631 632
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
  pBlocks->tid = pTableMeta->sid;
H
hjxilinx 已提交
633 634
  pBlocks->uid = pTableMeta->uid;
  pBlocks->sversion = pTableMeta->sversion;
S
slguan 已提交
635
  pBlocks->numOfRows += numOfRows;
H
hzcheng 已提交
636 637
}

S
slguan 已提交
638
// data block is disordered, sort it in ascending order
H
hjxilinx 已提交
639
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
640
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
S
slguan 已提交
641 642

  // size is less than the total size, since duplicated rows may be removed yet.
643
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
S
slguan 已提交
644

S
slguan 已提交
645 646 647 648 649
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
650
  if (!dataBuf->ordered) {
651
    char *pBlockData = pBlocks->data;
S
slguan 已提交
652
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
653

S
slguan 已提交
654 655
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
656

S
slguan 已提交
657
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
658 659
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
660

S
slguan 已提交
661 662 663 664
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
665

S
slguan 已提交
666 667 668 669 670 671 672 673 674
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
675

S
slguan 已提交
676
    pBlocks->numOfRows = i + 1;
677
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
678
  }
S
slguan 已提交
679 680
}

681
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd,
S
slguan 已提交
682
                                      int32_t *totalNum) {
S
slguan 已提交
683
  SSqlCmd *       pCmd = &pSql->cmd;
684
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
685
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
686
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
687
  
H
hjxilinx 已提交
688
  STableDataBlocks *dataBuf = NULL;
689
  int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
690
                                        sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
H
hjxilinx 已提交
691
                                        pTableMeta, &dataBuf);
H
hjxilinx 已提交
692 693 694 695
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  
L
lihui 已提交
696
  int32_t maxNumOfRows;
H
hjxilinx 已提交
697
  ret = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
L
lihui 已提交
698
  if (TSDB_CODE_SUCCESS != ret) {
699
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
700
  }
701

702
  int32_t code = TSDB_CODE_TSC_INVALID_SQL;
703
  char *  tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
L
[1292]  
lihui 已提交
704
  if (NULL == tmpTokenBuf) {
705
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
L
[1292]  
lihui 已提交
706
  }
L
lihui 已提交
707

H
hjxilinx 已提交
708
  int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
L
[1292]  
lihui 已提交
709
  free(tmpTokenBuf);
H
hzcheng 已提交
710
  if (numOfRows <= 0) {
L
[1292]  
lihui 已提交
711
    return code;
H
hzcheng 已提交
712 713
  }

S
slguan 已提交
714
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
715
    SParamInfo *param = dataBuf->params + i;
S
slguan 已提交
716 717
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
718
      param->offset -= sizeof(SSubmitBlk);
S
slguan 已提交
719 720 721
    }
  }

722
  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
H
hjxilinx 已提交
723
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
724

H
hjxilinx 已提交
725
  dataBuf->vgId = pTableMeta->vgroupInfo.vgId;
S
slguan 已提交
726
  dataBuf->numOfTables = 1;
H
hzcheng 已提交
727 728

  /*
S
slguan 已提交
729 730
   * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
   * which is actually returned from server.
H
hzcheng 已提交
731
   */
S
slguan 已提交
732
  *totalNum += numOfRows;
H
hzcheng 已提交
733 734 735
  return TSDB_CODE_SUCCESS;
}

736
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
S
slguan 已提交
737
  int32_t   index = 0;
738 739
  SSQLToken sToken = {0};
  SSQLToken tableToken = {0};
S
slguan 已提交
740
  int32_t   code = TSDB_CODE_SUCCESS;
741 742 743 744 745 746
  
  const int32_t TABLE_INDEX = 0;
  const int32_t STABLE_INDEX = 1;
  
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
747

S
slguan 已提交
748
  char *sql = *sqlstr;
749

S
slguan 已提交
750 751 752 753
  // get the token of specified table
  index = 0;
  tableToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;
H
hzcheng 已提交
754

S
slguan 已提交
755 756
  char *cstart = NULL;
  char *cend = NULL;
H
hzcheng 已提交
757

S
slguan 已提交
758 759 760 761 762
  // skip possibly exists column list
  index = 0;
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;

H
hzcheng 已提交
763
  int32_t numOfColList = 0;
S
slguan 已提交
764
  bool    createTable = false;
H
hzcheng 已提交
765

S
slguan 已提交
766 767 768
  if (sToken.type == TK_LP) {
    cstart = &sToken.z[0];
    index = 0;
H
hzcheng 已提交
769
    while (1) {
S
slguan 已提交
770 771 772
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      if (sToken.type == TK_RP) {
        cend = &sToken.z[0];
H
hzcheng 已提交
773 774 775 776 777 778
        break;
      }

      ++numOfColList;
    }

S
slguan 已提交
779 780
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
H
hzcheng 已提交
781 782 783
  }

  if (numOfColList == 0 && cstart != NULL) {
784
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
785
  }
786
  
H
hjxilinx 已提交
787
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
788 789
  
  if (sToken.type == TK_USING) {  // create table if not exists according to the super table
S
slguan 已提交
790 791 792 793
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;

H
Haojun Liao 已提交
794 795 796
    tscAllocPayload(pCmd, sizeof(STagData));
    STagData *pTag = (STagData *) pCmd->payload;

S
slguan 已提交
797
    memset(pTag, 0, sizeof(STagData));
798
    
H
Haojun Liao 已提交
799
    //the source super table is moved to the secondary position of the pTableMetaInfo list
800
    if (pQueryInfo->numOfTables < 2) {
H
hjxilinx 已提交
801
      tscAddEmptyMetaInfo(pQueryInfo);
802
    }
H
hzcheng 已提交
803

H
hjxilinx 已提交
804
    STableMetaInfo *pSTableMeterMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
H
Haojun Liao 已提交
805
    tscSetTableFullName(pSTableMeterMetaInfo, &sToken, pSql);
806

B
Bomin Zhang 已提交
807
    tstrncpy(pTag->name, pSTableMeterMetaInfo->name, sizeof(pTag->name));
H
hjxilinx 已提交
808
    code = tscGetTableMeta(pSql, pSTableMeterMetaInfo);
H
hzcheng 已提交
809 810 811 812
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

weixin_48148422's avatar
weixin_48148422 已提交
813
    if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMeterMetaInfo)) {
H
hjxilinx 已提交
814
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
815 816
    }

H
hjxilinx 已提交
817
    SSchema *pTagSchema = tscGetTableTagSchema(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
818
    STableComInfo tinfo = tscGetTableInfo(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
819
    
S
slguan 已提交
820 821 822
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
L
lihui 已提交
823

824
    SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
825 826
    
    uint8_t numOfTags = tscGetNumOfTags(pSTableMeterMetaInfo->pTableMeta);
L
lihui 已提交
827 828 829 830 831 832
    spd.numOfCols = numOfTags;

    // if specify some tags column
    if (sToken.type != TK_LP) {
      tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
    } else {
833 834
      /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
       * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
L
lihui 已提交
835 836 837 838 839 840 841 842 843 844 845
      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < numOfTags; ++t) {
        offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
      }

      while (1) {
        index = 0;
        sToken = tStrGetToken(sql, &index, false, 0, NULL);
        sql += index;

        if (TK_STRING == sToken.type) {
H
Haojun Liao 已提交
846 847
          strdequote(sToken.z);
          sToken.n = strtrim(sToken.z);
L
lihui 已提交
848 849 850 851 852 853 854 855 856 857 858 859
        }

        if (sToken.type == TK_RP) {
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < numOfTags; ++t) {
          if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) {
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
860
            pElem->offset = offset[t];
L
lihui 已提交
861 862 863 864 865 866
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
              return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z);
            }

867
            spd.hasVal[t] = true;
L
lihui 已提交
868 869 870 871 872 873 874 875 876 877 878 879 880
            findColumnIndex = true;
            break;
          }
        }

        if (!findColumnIndex) {
          return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken.z);
        }
      }

      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > numOfTags) {
        return tscInvalidSQLErrMsg(pCmd->payload, "tag name expected", sToken.z);
      }
L
lihui 已提交
881 882 883 884

      index = 0;
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      sql += index;
L
lihui 已提交
885
    }
886

S
slguan 已提交
887
    if (sToken.type != TK_TAGS) {
L
lihui 已提交
888
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
H
hzcheng 已提交
889 890
    }

S
slguan 已提交
891 892
    uint32_t ignoreTokenTypes = TK_LP;
    uint32_t numOfIgnoreToken = 1;
L
lihui 已提交
893
    for (int i = 0; i < spd.numOfAssignedCols; ++i) {
894
      char *  tagVal = pTag->data + spd.elems[i].offset;
L
lihui 已提交
895
      int16_t colIndex = spd.elems[i].colIndex;
896

S
slguan 已提交
897 898 899
      index = 0;
      sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
      sql += index;
H
Haojun Liao 已提交
900 901

      if (sToken.n == 0 || sToken.type == TK_RP) {
S
slguan 已提交
902 903
        break;
      }
H
hzcheng 已提交
904

S
slguan 已提交
905 906 907 908
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
909 910
      }

H
hjxilinx 已提交
911
      code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
H
hzcheng 已提交
912
      if (code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
913
        return code;
H
hzcheng 已提交
914
      }
L
lihui 已提交
915
    }
S
slguan 已提交
916

L
lihui 已提交
917 918 919 920 921
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
      return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
922 923
    }

L
lihui 已提交
924 925 926
    // 2. set the null value for the columns that do not assign values
    if (spd.numOfAssignedCols < spd.numOfCols) {
      char *ptr = pTag->data;
927

L
lihui 已提交
928
      for (int32_t i = 0; i < spd.numOfCols; ++i) {
929
        if (!spd.hasVal[i]) {  // current tag column do not have any value to insert, set it to null
930 931 932 933 934
          if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
            setVardataNull(ptr, pTagSchema[i].type);
          } else {
            setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
          }
L
lihui 已提交
935
        }
936

L
lihui 已提交
937
        ptr += pTagSchema[i].bytes;
938
      }
H
hzcheng 已提交
939 940
    }

941
    // 3. calculate the actual data size of STagData
942
    pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen);
943
    for (int32_t t = 0; t < numOfTags; ++t) {
944
      pTag->dataLen += pTagSchema[t].bytes;
945 946
      pCmd->payloadLen += pTagSchema[t].bytes;
    }
947
    pTag->dataLen = htonl(pTag->dataLen);
948

H
hzcheng 已提交
949
    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
950
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
951 952
    }

H
Haojun Liao 已提交
953
    int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
H
hzcheng 已提交
954 955 956 957 958
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

    createTable = true;
H
hjxilinx 已提交
959
    code = tscGetMeterMetaEx(pSql, pTableMetaInfo, true);
960
    if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
961 962 963
      return code;
    }
    
H
hzcheng 已提交
964 965 966 967
  } else {
    if (cstart != NULL) {
      sql = cstart;
    } else {
S
slguan 已提交
968
      sql = sToken.z;
H
hzcheng 已提交
969
    }
B
Bomin Zhang 已提交
970
    code = tscGetMeterMetaEx(pSql, pTableMetaInfo, false);
H
hjxilinx 已提交
971
    
972
    if (pCmd->curSql == NULL) {
973
      assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
H
hjxilinx 已提交
974
    }
H
hzcheng 已提交
975 976 977 978 979
  }

  int32_t len = cend - cstart + 1;
  if (cstart != NULL && createTable == true) {
    /* move the column list to start position of the next accessed points */
W
WangXin 已提交
980
    memmove(sql - len, cstart, len);
H
hzcheng 已提交
981 982 983 984 985
    *sqlstr = sql - len;
  } else {
    *sqlstr = sql;
  }

986
  if (*sqlstr == NULL) {
987
    code = TSDB_CODE_TSC_INVALID_SQL;
988 989
  }
  
H
hzcheng 已提交
990 991 992
  return code;
}

H
Hui Li 已提交
993
int validateTableName(char *tblName, int len, SSQLToken* psTblToken) {
B
Bomin Zhang 已提交
994
  tstrncpy(psTblToken->z, tblName, TSDB_TABLE_ID_LEN);
S
slguan 已提交
995

H
Hui Li 已提交
996 997
  psTblToken->n    = len;
  psTblToken->type = TK_ID;
B
Bomin Zhang 已提交
998
  tSQLGetToken(psTblToken->z, &psTblToken->type);
S
slguan 已提交
999

H
Hui Li 已提交
1000
  return tscValidateName(psTblToken);
H
huili 已提交
1001 1002
}

1003 1004 1005 1006 1007 1008 1009 1010 1011
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
  if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
  }

  pCmd->dataSourceType = type;
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
1012 1013 1014 1015 1016 1017 1018 1019 1020
/**
 * usage: insert into table1 values() () table2 values()()
 *
 * @param str
 * @param acct
 * @param db
 * @param pSql
 * @return
 */
H
Haojun Liao 已提交
1021
int tsParseInsertSql(SSqlObj *pSql) {
S
slguan 已提交
1022
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1023
  char* str = pCmd->curSql;
1024

S
slguan 已提交
1025
  int32_t totalNum = 0;
1026 1027 1028 1029 1030
  int32_t code = TSDB_CODE_SUCCESS;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo != NULL);

H
Haojun Liao 已提交
1031
  STableMetaInfo *pTableMetaInfo = NULL;
1032
  if (pQueryInfo->numOfTables == 0) {
H
hjxilinx 已提交
1033
    pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
1034
  } else {
H
hjxilinx 已提交
1035
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1036
  }
H
hzcheng 已提交
1037

H
Haojun Liao 已提交
1038 1039 1040
  if ((code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    return code;
  }
H
hzcheng 已提交
1041

H
Haojun Liao 已提交
1042
  if (NULL == pCmd->pTableList) {
1043
    pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
1044
    pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
1045
    if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
1046
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1047
      goto _error;
L
lihui 已提交
1048 1049
    }
  } else {
1050
    str = pCmd->curSql;
L
lihui 已提交
1051 1052
  }
  
H
Haojun Liao 已提交
1053
  tscTrace("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList);
H
hzcheng 已提交
1054 1055

  while (1) {
1056
    int32_t   index = 0;
S
slguan 已提交
1057
    SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL);
1058 1059 1060 1061 1062 1063 1064 1065

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      /*
       * if the data is from the data file, no data has been generated yet. So, there no data to
       * merge or submit, save the file path and parse the file in other routines.
       */
      if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
S
slguan 已提交
1066 1067 1068
        goto _clean;
      }

1069 1070 1071 1072 1073
      /*
       * if no data has been generated during parsing the sql string, error msg will return
       * Otherwise, create the first submit block and submit to virtual node.
       */
      if (totalNum == 0) {
1074
        code = TSDB_CODE_TSC_INVALID_SQL;
H
Haojun Liao 已提交
1075
        goto _error;
1076 1077
      } else {
        break;
H
hzcheng 已提交
1078 1079 1080
      }
    }

1081
    pCmd->curSql = sToken.z;
B
Bomin Zhang 已提交
1082
    char buf[TSDB_TABLE_ID_LEN];
H
Hui Li 已提交
1083
    SSQLToken sTblToken;
B
Bomin Zhang 已提交
1084
    sTblToken.z = buf;
S
slguan 已提交
1085
    // Check if the table name available or not
H
Hui Li 已提交
1086
    if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1087
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
H
Haojun Liao 已提交
1088
      goto _error;
H
huili 已提交
1089 1090
    }

H
Hui Li 已提交
1091
    if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1092
      goto _error;
H
hzcheng 已提交
1093 1094
    }

1095 1096
    if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
      /*
H
Haojun Liao 已提交
1097 1098
       * After retrieving the table meta from server, the sql string will be parsed from the paused position.
       * And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
1099
       */
1100
      if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
1101
        return code;
H
hzcheng 已提交
1102
      }
H
hjxilinx 已提交
1103
      
H
Haojun Liao 已提交
1104
      tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
1105
      pCmd->curSql = NULL;
H
Haojun Liao 已提交
1106
      goto _error;
H
hzcheng 已提交
1107 1108
    }

weixin_48148422's avatar
weixin_48148422 已提交
1109
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
1110
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
H
Haojun Liao 已提交
1111
      goto _error;
H
hzcheng 已提交
1112 1113
    }

S
slguan 已提交
1114 1115 1116
    index = 0;
    sToken = tStrGetToken(str, &index, false, 0, NULL);
    str += index;
1117

S
slguan 已提交
1118
    if (sToken.n == 0) {
1119
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
H
Haojun Liao 已提交
1120
      goto _error;
H
hzcheng 已提交
1121
    }
H
hjxilinx 已提交
1122
    
H
hjxilinx 已提交
1123
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1124
    
S
slguan 已提交
1125
    if (sToken.type == TK_VALUES) {
H
hjxilinx 已提交
1126
      SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
H
hjxilinx 已提交
1127 1128
      
      SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1129
      tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
H
hzcheng 已提交
1130

1131
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1132
        goto _error;
H
hzcheng 已提交
1133 1134 1135 1136 1137 1138
      }

      /*
       * app here insert data in different vnodes, so we need to set the following
       * data in another submit procedure using async insert routines
       */
1139
      code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
H
hzcheng 已提交
1140
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1141
        goto _error;
H
hzcheng 已提交
1142
      }
S
slguan 已提交
1143
    } else if (sToken.type == TK_FILE) {
1144
      if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1145
        goto _error;
H
hzcheng 已提交
1146 1147
      }

S
slguan 已提交
1148 1149 1150 1151
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1152
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
H
Haojun Liao 已提交
1153
        goto _error;
H
hzcheng 已提交
1154 1155
      }

H
Haojun Liao 已提交
1156 1157
      strncpy(pCmd->payload, sToken.z, sToken.n);
      strdequote(pCmd->payload);
1158

H
Haojun Liao 已提交
1159
      // todo refactor extract method
H
hzcheng 已提交
1160
      wordexp_t full_path;
H
Haojun Liao 已提交
1161
      if (wordexp(pCmd->payload, &full_path, 0) != 0) {
H
hjxilinx 已提交
1162
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
H
Haojun Liao 已提交
1163
        goto _error;
H
hzcheng 已提交
1164 1165
      }

H
Haojun Liao 已提交
1166 1167
      tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
      wordfree(&full_path);
1168

S
slguan 已提交
1169
    } else if (sToken.type == TK_LP) {
H
hzcheng 已提交
1170
      /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
1171
      STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
H
hjxilinx 已提交
1172
      SSchema *   pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1173

1174
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1175
        goto _error;
H
hzcheng 已提交
1176 1177
      }

1178
      SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
1179
      spd.numOfCols = tinfo.numOfColumns;
H
hzcheng 已提交
1180 1181

      int16_t offset[TSDB_MAX_COLUMNS] = {0};
H
hjxilinx 已提交
1182
      for (int32_t t = 1; t < tinfo.numOfColumns; ++t) {
H
hzcheng 已提交
1183 1184 1185 1186
        offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
      }

      while (1) {
S
slguan 已提交
1187 1188 1189 1190 1191
        index = 0;
        sToken = tStrGetToken(str, &index, false, 0, NULL);
        str += index;

        if (TK_STRING == sToken.type) {
H
Haojun Liao 已提交
1192 1193
          strdequote(sToken.z);
          sToken.n = strtrim(sToken.z);
S
slguan 已提交
1194 1195 1196
        }

        if (sToken.type == TK_RP) {
H
hzcheng 已提交
1197 1198 1199 1200 1201 1202
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
H
hjxilinx 已提交
1203
        for (int32_t t = 0; t < tinfo.numOfColumns; ++t) {
S
slguan 已提交
1204
          if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
S
slguan 已提交
1205
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
H
hzcheng 已提交
1206 1207 1208 1209
            pElem->offset = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
H
hjxilinx 已提交
1210
              code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
H
Haojun Liao 已提交
1211
              goto _error;
H
hzcheng 已提交
1212 1213 1214 1215 1216 1217 1218 1219
            }

            spd.hasVal[t] = true;
            findColumnIndex = true;
            break;
          }
        }

S
slguan 已提交
1220
        if (!findColumnIndex) {
H
hjxilinx 已提交
1221
          code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
H
Haojun Liao 已提交
1222
          goto _error;
H
hzcheng 已提交
1223 1224 1225
        }
      }

H
hjxilinx 已提交
1226
      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
H
hjxilinx 已提交
1227
        code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
H
Haojun Liao 已提交
1228
        goto _error;
H
hzcheng 已提交
1229 1230
      }

S
slguan 已提交
1231 1232 1233 1234 1235
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;

      if (sToken.type != TK_VALUES) {
H
hjxilinx 已提交
1236
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
H
Haojun Liao 已提交
1237
        goto _error;
H
hzcheng 已提交
1238 1239
      }

1240
      code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
H
hzcheng 已提交
1241
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1242
        goto _error;
H
hzcheng 已提交
1243 1244
      }
    } else {
H
hjxilinx 已提交
1245
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
Haojun Liao 已提交
1246
      goto _error;
H
hzcheng 已提交
1247 1248 1249
    }
  }

S
slguan 已提交
1250 1251 1252 1253
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }
1254

1255
  if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId
S
slguan 已提交
1256
    if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1257
      goto _error;
S
slguan 已提交
1258
    }
H
hzcheng 已提交
1259
  } else {
S
slguan 已提交
1260
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1261 1262 1263 1264 1265
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

H
Haojun Liao 已提交
1266
_error:
S
slguan 已提交
1267
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1268 1269

_clean:
1270 1271
  taosHashCleanup(pCmd->pTableList);
  pCmd->pTableList = NULL;
H
hjxilinx 已提交
1272
  
1273 1274
  pCmd->curSql    = NULL;
  pCmd->parseFinished  = 1;
H
hjxilinx 已提交
1275
  
H
hzcheng 已提交
1276 1277 1278
  return code;
}

H
Haojun Liao 已提交
1279
int tsInsertInitialCheck(SSqlObj *pSql) {
S
slguan 已提交
1280
  if (!pSql->pTscObj->writeAuth) {
1281
    return TSDB_CODE_TSC_NO_WRITE_AUTH;
S
slguan 已提交
1282
  }
H
hzcheng 已提交
1283

H
hjxilinx 已提交
1284
  int32_t  index = 0;
S
slguan 已提交
1285
  SSqlCmd *pCmd = &pSql->cmd;
1286 1287

  SSQLToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
H
hjxilinx 已提交
1288
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
1289 1290 1291

  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;
1292
  pSql->res.numOfRows = 0;
1293 1294

  SQueryInfo *pQueryInfo = NULL;
1295
  tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
1296

H
Haojun Liao 已提交
1297
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
1298 1299

  sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
S
slguan 已提交
1300
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1301
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1302
  }
1303

H
Haojun Liao 已提交
1304 1305
  pCmd->curSql = sToken.z + sToken.n;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1306 1307
}

H
Haojun Liao 已提交
1308
int tsParseSql(SSqlObj *pSql, bool initial) {
H
hzcheng 已提交
1309
  int32_t ret = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1310
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1311

H
Haojun Liao 已提交
1312
  if ((!pCmd->parseFinished) && (!initial)) {
H
Haojun Liao 已提交
1313
    tscTrace("%p resume to parse sql: %s", pSql, pCmd->curSql);
H
[TD-98]  
hjxilinx 已提交
1314
  }
L
lihui 已提交
1315
  
H
hjxilinx 已提交
1316
  if (tscIsInsertData(pSql->sqlstr)) {
H
hzcheng 已提交
1317
    /*
1318 1319
     * Set the fp before parse the sql string, in case of getTableMeta failed, in which
     * the error handle callback function can rightfully restore the user-defined callback function (fp).
H
hzcheng 已提交
1320
     */
H
Haojun Liao 已提交
1321
    if (initial && (pSql->cmd.insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
H
[TD-98]  
hjxilinx 已提交
1322
      pSql->fetchFp = pSql->fp;
H
hjxilinx 已提交
1323
      pSql->fp = (void(*)())tscHandleMultivnodeInsert;
H
hzcheng 已提交
1324
    }
H
Haojun Liao 已提交
1325
    
H
Haojun Liao 已提交
1326
    if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
H
Haojun Liao 已提交
1327 1328 1329
      return ret;
    }
    
1330
    ret = tsParseInsertSql(pSql);
H
hzcheng 已提交
1331
  } else {
S
slguan 已提交
1332
    ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
1333 1334 1335
    if (TSDB_CODE_SUCCESS != ret) {
      return ret;
    }
H
Haojun Liao 已提交
1336

1337
    SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
H
hzcheng 已提交
1338 1339 1340 1341 1342
    ret = tscToSQLCmd(pSql, &SQLInfo);
    SQLInfoDestroy(&SQLInfo);
  }

  /*
1343
   * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
H
Haojun Liao 已提交
1344 1345 1346
   * so do NOT use pRes->code to determine if the getTableMeta function
   * invokes new threads to get data from mgmt node or simply retrieves data from cache.
   * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
H
hzcheng 已提交
1347 1348 1349 1350
   */
  return ret;
}

S
slguan 已提交
1351 1352 1353
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1354
  pSql->res.numOfRows = 0;
S
slguan 已提交
1355

1356
  assert(pCmd->numOfClause == 1);
1357
  STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
S
slguan 已提交
1358

1359
  SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
H
hjxilinx 已提交
1360
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
1361

S
slguan 已提交
1362 1363 1364
  if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
    return code;
  }
S
slguan 已提交
1365

1366
  STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
S
slguan 已提交
1367 1368 1369 1370
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
  return tscProcessSql(pSql);
}

typedef struct SImportFileSupport {
  SSqlObj *pSql;
  FILE    *fp;
} SImportFileSupport;

static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
  assert(param != NULL && tres != NULL);

  SSqlObj *pSql = tres;
  SSqlCmd *pCmd = &pSql->cmd;

  SImportFileSupport *pSupporter = (SImportFileSupport *) param;

  SSqlObj *pParentSql = pSupporter->pSql;
  FILE    *fp = pSupporter->fp;

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {  // handle error
    assert(taos_errno(pSql) == code);

    taos_free_result(pSql);
    tfree(pSupporter);
    fclose(fp);

    pParentSql->res.code = code;
H
Haojun Liao 已提交
1398
    tscQueueAsyncRes(pParentSql);
H
Haojun Liao 已提交
1399
    return;
S
slguan 已提交
1400 1401
  }

H
Haojun Liao 已提交
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
  // accumulate the total submit records
  pParentSql->res.numOfRows += pSql->res.numOfRows;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
  SSchema *       pSchema = tscGetTableSchema(pTableMeta);
  STableComInfo   tinfo = tscGetTableInfo(pTableMeta);

  SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
  tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);

  size_t  n = 0;
  ssize_t readLen = 0;
  char *  line = NULL;
  int32_t count = 0;
  int32_t maxRows = 0;

H
Haojun Liao 已提交
1419 1420
  tscDestroyBlockArrayList(pSql->cmd.pDataBlocks);
  pCmd->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
1421

H
Haojun Liao 已提交
1422 1423 1424 1425 1426 1427 1428
  STableDataBlocks *pTableDataBlock = NULL;
  int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
  if (ret != TSDB_CODE_SUCCESS) {
//    return ret;
  }

  taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock);
H
Haojun Liao 已提交
1429
  tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
H
Haojun Liao 已提交
1430

H
Haojun Liao 已提交
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
  char *tokenBuf = calloc(1, 4096);

  while ((readLen = getline(&line, &n, fp)) != -1) {
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
      line[--readLen] = 0;
    }

    if (readLen == 0) {
      continue;
    }

    char *lineptr = line;
    strtolower(line, line);

    int32_t len =
        tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf);
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
      pSql->res.code = code;
      break;
    }

    pTableDataBlock->size += len;

    if (++count >= maxRows) {
      break;
    }
  }

  tfree(tokenBuf);
  free(line);

  if (count > 0) {
H
Haojun Liao 已提交
1463 1464
    code = doPackSendDataBlock(pSql, count, pTableDataBlock);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1465
      pParentSql->res.code = code;
H
Haojun Liao 已提交
1466 1467
      tscQueueAsyncRes(pParentSql);
      return;
H
Haojun Liao 已提交
1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
    }

  } else {
    taos_free_result(pSql);
    tfree(pSupporter);
    fclose(fp);

    pParentSql->fp = pParentSql->fetchFp;

    // all data has been sent to vnode, call user function
    int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : pParentSql->res.numOfRows;
    (*pParentSql->fp)(pParentSql->param, pParentSql, v);
  }
S
slguan 已提交
1481 1482
}

H
Haojun Liao 已提交
1483
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
S
slguan 已提交
1484
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1485 1486 1487 1488
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

H
Haojun Liao 已提交
1489
  assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE  && strlen(pCmd->payload) != 0);
H
Haojun Liao 已提交
1490

H
Haojun Liao 已提交
1491
  SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
H
Haojun Liao 已提交
1492 1493 1494
  SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);

  pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES);
H
Haojun Liao 已提交
1495
  pCmd->count = 1;
H
Haojun Liao 已提交
1496

H
Haojun Liao 已提交
1497 1498 1499 1500
  FILE *fp = fopen(pCmd->payload, "r");
  if (fp == NULL) {
    pSql->res.code = TAOS_SYSTEM_ERROR(errno);
    tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
H
hzcheng 已提交
1501

H
Haojun Liao 已提交
1502 1503
    tfree(pSupporter)
    tscQueueAsyncRes(pSql);
S
slguan 已提交
1504

H
Haojun Liao 已提交
1505 1506
    return;
  }
S
slguan 已提交
1507

H
Haojun Liao 已提交
1508 1509
  pSupporter->pSql = pSql;
  pSupporter->fp = fp;
1510

H
Haojun Liao 已提交
1511
  parseFileSendDataBlock(pSupporter, pNew, 0);
H
hzcheng 已提交
1512
}