tscParseInsert.c 45.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

S
slguan 已提交
21
#include "os.h"
22
#include "ihash.h"
H
hzcheng 已提交
23 24 25 26 27 28 29 30 31 32 33
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsqldef.h"
#include "ttypes.h"

#include "tlog.h"
#include "tstoken.h"
#include "ttime.h"

S
slguan 已提交
34
enum {
S
slguan 已提交
35 36 37 38
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

S
slguan 已提交
39
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize);
H
hzcheng 已提交
40

S
slguan 已提交
41 42 43 44
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
  int32_t numType = isValidNumber(pToken);
  if (TK_ILLEGAL == numType) {
    return numType;
H
hzcheng 已提交
45 46 47 48 49 50 51
  }

  int32_t radix = 10;
  if (numType == TK_HEX) {
    radix = 16;
  } else if (numType == TK_OCT) {
    radix = 8;
S
slguan 已提交
52 53
  } else if (numType == TK_BIN) {
    radix = 2;
H
hzcheng 已提交
54 55
  }

L
lihui 已提交
56
  errno = 0;
S
slguan 已提交
57 58 59
  *value = strtoll(pToken->z, endPtr, radix);

  return numType;
H
hzcheng 已提交
60 61
}

S
slguan 已提交
62 63 64 65 66
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
  int32_t numType = isValidNumber(pToken);
  if (TK_ILLEGAL == numType) {
    return numType;
  }
L
lihui 已提交
67 68

  errno = 0;
S
slguan 已提交
69 70 71
  *value = strtod(pToken->z, endPtr);
  return numType;
}
H
hzcheng 已提交
72

S
slguan 已提交
73
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
F
fang 已提交
74 75
  //char *    token; //fang not used
  //int       tokenlen; //fang not used
S
slguan 已提交
76 77 78 79 80
  int32_t   index = 0;
  SSQLToken sToken;
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
81

S
slguan 已提交
82
  index = 0;
H
hzcheng 已提交
83

S
slguan 已提交
84
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
85
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
86
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
87
    // do nothing
S
slguan 已提交
88 89
  } else if (pToken->type == TK_INTEGER) {
    useconds = str2int64(pToken->z);
H
hzcheng 已提交
90 91
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
S
slguan 已提交
92
    if (taosParseTime(pToken->z, time, pToken->n, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
93
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
94 95 96 97 98
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
99 100 101 102
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
103 104 105 106 107 108 109 110
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
111 112 113
   * time expression:
   * e.g., now+12a, now-5h
   */
S
slguan 已提交
114 115 116 117
  SSQLToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
  pTokenEnd += index;
H
hjxilinx 已提交
118
  
S
slguan 已提交
119
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
H
hjxilinx 已提交
120
    
S
slguan 已提交
121 122 123
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
    pTokenEnd += index;
H
hjxilinx 已提交
124
    
S
slguan 已提交
125
    if (valueToken.n < 2) {
H
hjxilinx 已提交
126
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
127 128
    }

S
slguan 已提交
129
    if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
130 131
      return TSDB_CODE_INVALID_SQL;
    }
H
hjxilinx 已提交
132
    
H
hzcheng 已提交
133 134 135 136
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
137
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
138 139 140 141 142 143 144 145 146 147 148 149
      useconds += interval;
    } else {
      useconds = (useconds >= interval) ? useconds - interval : 0;
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
150 151 152 153
int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
                             int16_t timePrec) {
  int64_t iv;
  int32_t numType;
S
slguan 已提交
154
  char *  endptr = NULL;
H
hjxilinx 已提交
155 156
  errno = 0;   // clear the previous existed error information
  
H
hzcheng 已提交
157 158
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
S
slguan 已提交
159 160
      if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
S
slguan 已提交
161
          *(uint8_t *)payload = TSDB_TRUE;
S
slguan 已提交
162
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
S
slguan 已提交
163
          *(uint8_t *)payload = TSDB_FALSE;
S
slguan 已提交
164 165
        } else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
          *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
H
hzcheng 已提交
166
        } else {
H
hjxilinx 已提交
167
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
168
        }
S
slguan 已提交
169 170 171 172 173 174 175 176 177
      } else if (pToken->type == TK_INTEGER) {
        iv = strtoll(pToken->z, NULL, 10);
        *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_FLOAT) {
        double dv = strtod(pToken->z, NULL);
        *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_NULL) {
        *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
      } else {
H
hjxilinx 已提交
178
        return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
179 180 181 182
      }
      break;
    }
    case TSDB_DATA_TYPE_TINYINT:
S
slguan 已提交
183 184 185 186 187
      if (pToken->type == TK_NULL) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
188
      } else {
S
slguan 已提交
189
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
190
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
191
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
L
lihui 已提交
192
        } else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
H
hjxilinx 已提交
193
          return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
H
hzcheng 已提交
194 195
        }

H
hjxilinx 已提交
196
        *((int8_t *)payload) = (int8_t) iv;
H
hzcheng 已提交
197 198 199 200 201
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
S
slguan 已提交
202 203 204 205 206
      if (pToken->type == TK_NULL) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
207
      } else {
S
slguan 已提交
208
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
209
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
210
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
L
lihui 已提交
211
        } else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
H
hjxilinx 已提交
212
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
213 214
        }

S
slguan 已提交
215
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
216 217 218 219
      }
      break;

    case TSDB_DATA_TYPE_INT:
S
slguan 已提交
220 221 222 223
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
224
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
225
      } else {
S
slguan 已提交
226
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
227
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
228
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
L
lihui 已提交
229
        } else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
H
hjxilinx 已提交
230
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
231 232
        }

S
slguan 已提交
233
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
234 235 236 237 238
      }

      break;

    case TSDB_DATA_TYPE_BIGINT:
S
slguan 已提交
239 240 241 242
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
243
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
244
      } else {
S
slguan 已提交
245
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
246
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
247
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
L
lihui 已提交
248
        } else if (errno == ERANGE || iv > INT64_MAX || iv <= INT64_MIN) {
H
hjxilinx 已提交
249
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
250
        }
S
slguan 已提交
251 252

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
253 254 255 256
      }
      break;

    case TSDB_DATA_TYPE_FLOAT:
S
slguan 已提交
257 258 259 260
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
261
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
262
      } else {
S
slguan 已提交
263 264
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
265
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
266 267 268 269
        }

        float fv = (float)dv;
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
H
hjxilinx 已提交
270
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
271 272
        }

S
slguan 已提交
273 274
        if (isinf(fv) || isnan(fv)) {
          *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
275
        }
S
slguan 已提交
276 277

        *((float *)payload) = fv;
H
hzcheng 已提交
278 279 280 281
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
S
slguan 已提交
282 283 284 285
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
286
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
287
      } else {
S
slguan 已提交
288 289
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
290
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
291 292
        }

S
slguan 已提交
293
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
H
hjxilinx 已提交
294
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
295 296 297 298 299 300
        }

        if (isinf(dv) || isnan(dv)) {
          *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
        } else {
          *((double *)payload) = dv;
H
hzcheng 已提交
301 302 303 304 305
        }
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
306 307
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
308
        *payload = TSDB_DATA_BINARY_NULL;
H
hjxilinx 已提交
309
      } else { // too long values will return invalid sql, not be truncated automatically
S
slguan 已提交
310
        if (pToken->n > pSchema->bytes) {
H
hjxilinx 已提交
311
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
312
        }
H
hjxilinx 已提交
313
        
S
slguan 已提交
314
        strncpy(payload, pToken->z, pToken->n);
H
hzcheng 已提交
315 316 317 318 319
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
320
      if (pToken->type == TK_NULL) {
S
slguan 已提交
321
        *(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
H
hzcheng 已提交
322
      } else {
S
slguan 已提交
323 324
        // if the converted output len is over than pSchema->bytes, return error: 'Argument list too long'
        if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) {
H
hjxilinx 已提交
325 326 327 328
          char buf[512] = {0};
          snprintf(buf, 512, "%s", strerror(errno));
          
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
329 330 331 332 333
        }
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
334
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
335
        if (primaryKey) {
S
slguan 已提交
336
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
337
        } else {
S
slguan 已提交
338
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
339 340
        }
      } else {
S
slguan 已提交
341 342
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
343
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
344
        }
H
hjxilinx 已提交
345
        
S
slguan 已提交
346
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
347 348 349 350 351 352
      }

      break;
    }
  }

H
hjxilinx 已提交
353
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
354 355
}

S
slguan 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == 0) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
376 377
      return -1;  // client time/server time can not be mixed
      
S
slguan 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
391
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
L
[1292]  
lihui 已提交
392
                      int16_t timePrec, int32_t *code, char* tmpTokenBuf) {
S
slguan 已提交
393
  int32_t   index = 0;
F
fang 已提交
394
  //bool      isPrevOptr; //fang, never used
H
hjxilinx 已提交
395
  SSQLToken sToken = {0};
S
slguan 已提交
396
  char *    payload = pDataBlocks->pData + pDataBlocks->size;
S
slguan 已提交
397

S
slguan 已提交
398
  // 1. set the parsed value from sql string
H
hzcheng 已提交
399
  int32_t rowSize = 0;
400
  for (int i = 0; i < spd->numOfAssignedCols; ++i) {
S
slguan 已提交
401
    // the start position in data block buffer of current value in sql
S
slguan 已提交
402
    char *  start = payload + spd->elems[i].offset;
H
hzcheng 已提交
403
    int16_t colIndex = spd->elems[i].colIndex;
S
slguan 已提交
404 405
    SSchema *pSchema = schema + colIndex;
    rowSize += pSchema->bytes;
H
hzcheng 已提交
406

S
slguan 已提交
407 408 409 410 411 412 413 414 415
    index = 0;
    sToken = tStrGetToken(*str, &index, true, 0, NULL);
    *str += index;

    if (sToken.type == TK_QUESTION) {
      uint32_t offset = start - pDataBlocks->pData;
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
H
hjxilinx 已提交
416
      
S
slguan 已提交
417
      strcpy(error, "client out of memory");
L
[1292]  
lihui 已提交
418
      *code = TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
419 420 421
      return -1;
    }

S
slguan 已提交
422 423 424
    if (((sToken.type != TK_NOW) && (sToken.type != TK_INTEGER) && (sToken.type != TK_STRING) &&
         (sToken.type != TK_FLOAT) && (sToken.type != TK_BOOL) && (sToken.type != TK_NULL)) ||
        (sToken.n == 0) || (sToken.type == TK_RP)) {
H
hjxilinx 已提交
425
      tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z);
L
[1292]  
lihui 已提交
426
      *code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
427
      return -1;
H
hzcheng 已提交
428 429
    }

S
slguan 已提交
430 431
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
432 433 434 435
      // delete escape character: \\, \', \"
      char delim = sToken.z[0];
      int32_t cnt = 0;
      int32_t j = 0;
F
fang 已提交
436 437 438
      for (int32_t k = 1; k < sToken.n - 1; ++k) {  
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
439
            cnt++;
L
lihui 已提交
440 441 442
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
443 444 445 446
            continue;
          }
        }
      
L
[NONE]  
lihui 已提交
447
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
448 449
        j++;
      }
L
lihui 已提交
450
      tmpTokenBuf[j] = 0; 
L
[1292]  
lihui 已提交
451 452
      sToken.z = tmpTokenBuf;
      sToken.n -= 2 + cnt;    
H
hzcheng 已提交
453 454
    }

S
slguan 已提交
455 456
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
    int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
S
slguan 已提交
457
    if (ret != TSDB_CODE_SUCCESS) {
L
[1292]  
lihui 已提交
458
      *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
459 460
      return -1;  // NOTE: here 0 mean error!
    }
461

S
slguan 已提交
462
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
463
      tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
L
[1292]  
lihui 已提交
464
      *code = TSDB_CODE_INVALID_TIME_STAMP;
S
slguan 已提交
465
      return -1;
466
    }
H
hzcheng 已提交
467 468
  }

S
slguan 已提交
469
  // 2. set the null value for the columns that do not assign values
470
  if (spd->numOfAssignedCols < spd->numOfCols) {
S
slguan 已提交
471
    char *ptr = payload;
H
hzcheng 已提交
472 473

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
H
hjxilinx 已提交
474
      if (!spd->hasVal[i]) { // current column do not have any value to insert, set it to null
H
hzcheng 已提交
475 476 477 478 479 480 481 482 483 484 485 486
        setNull(ptr, schema[i].type, schema[i].bytes);
      }

      ptr += schema[i].bytes;
    }

    rowSize = ptr - payload;
  }

  return rowSize;
}

S
slguan 已提交
487 488 489 490 491 492 493 494 495
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
496 497
}

S
slguan 已提交
498
int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMeta, int maxRows,
L
[1292]  
lihui 已提交
499
                  SParsedDataColInfo *spd, char *error, int32_t *code, char* tmpTokenBuf) {
S
slguan 已提交
500 501
  int32_t   index = 0;
  SSQLToken sToken;
H
hzcheng 已提交
502 503 504

  int16_t numOfRows = 0;

S
slguan 已提交
505 506 507 508
  SSchema *pSchema = tsGetSchema(pMeterMeta);
  int32_t  precision = pMeterMeta->precision;

  if (spd->hasVal[0] == false) {
S
slguan 已提交
509
    strcpy(error, "primary timestamp column can not be null");
L
[1292]  
lihui 已提交
510
    *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
511 512 513 514
    return -1;
  }

  while (1) {
S
slguan 已提交
515 516 517
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
518

S
slguan 已提交
519
    *str += index;
S
slguan 已提交
520
    if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) {
S
slguan 已提交
521
      int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize);
H
hjxilinx 已提交
522
      if (0 == tSize) {  //TODO pass the correct error code to client
S
slguan 已提交
523
        strcpy(error, "client out of memory");
L
[1292]  
lihui 已提交
524
        *code = TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
525 526
        return -1;
      }
H
hjxilinx 已提交
527
      
S
slguan 已提交
528
      maxRows += tSize;
H
hzcheng 已提交
529 530
    }

L
[1292]  
lihui 已提交
531
    int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
H
hjxilinx 已提交
532
    if (len <= 0) { // error message has been set in tsParseOneRowData
H
hzcheng 已提交
533 534 535 536 537
      return -1;
    }

    pDataBlock->size += len;

S
slguan 已提交
538 539 540 541
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
hjxilinx 已提交
542
      tscInvalidSQLErrMsg(error, ") expected", *str);
L
[1292]  
lihui 已提交
543
      *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
544 545 546 547 548 549 550 551
      return -1;
    }

    numOfRows++;
  }

  if (numOfRows <= 0) {
    strcpy(error, "no any data points");
L
[1292]  
lihui 已提交
552
    *code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
553 554 555
    return -1;
  } else {
    return numOfRows;
H
hzcheng 已提交
556 557 558
  }
}

S
slguan 已提交
559
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) {
H
hzcheng 已提交
560
  spd->numOfCols = numOfCols;
561
  spd->numOfAssignedCols = numOfCols;
H
hzcheng 已提交
562 563 564 565 566 567 568 569 570 571 572

  for (int32_t i = 0; i < numOfCols; ++i) {
    spd->hasVal[i] = true;
    spd->elems[i].colIndex = i;

    if (i > 0) {
      spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes;
    }
  }
}

S
slguan 已提交
573
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) {
S
slguan 已提交
574
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
575
  const int factor = 5;
S
slguan 已提交
576
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
H
hzcheng 已提交
577 578

  // expand the allocated size
S
slguan 已提交
579 580
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
581
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
582 583
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
584

S
slguan 已提交
585 586 587 588 589
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
S
slguan 已提交
590
      //assert(false);
S
slguan 已提交
591
      // do nothing
S
slguan 已提交
592 593
      pDataBlock->nAllocSize = nAllocSizeOld;
      return 0;
S
slguan 已提交
594
    }
H
hzcheng 已提交
595 596
  }

S
slguan 已提交
597
  return (int32_t)(pDataBlock->nAllocSize - pDataBlock->size) / rowSize;
H
hzcheng 已提交
598 599
}

S
slguan 已提交
600 601 602 603 604
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterMeta, int32_t numOfRows) {
  pBlocks->sid = pMeterMeta->sid;
  pBlocks->uid = pMeterMeta->uid;
  pBlocks->sversion = pMeterMeta->sversion;
  pBlocks->numOfRows += numOfRows;
H
hzcheng 已提交
605 606
}

S
slguan 已提交
607 608
// data block is disordered, sort it in ascending order
void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
S
slguan 已提交
609
  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)dataBuf->pData;
S
slguan 已提交
610 611 612 613

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);

S
slguan 已提交
614 615 616 617 618
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
619
  if (!dataBuf->ordered) {
S
slguan 已提交
620 621
    char *pBlockData = pBlocks->payLoad;
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
622

S
slguan 已提交
623 624
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
625

S
slguan 已提交
626
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
627 628
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
629

S
slguan 已提交
630 631 632 633
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
634

S
slguan 已提交
635 636 637 638 639 640 641 642 643
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
644

S
slguan 已提交
645
    pBlocks->numOfRows = i + 1;
S
slguan 已提交
646
    dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
647
  }
S
slguan 已提交
648 649 650 651
}

static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
                                      int32_t *totalNum) {
S
slguan 已提交
652 653 654
  SSqlCmd *       pCmd = &pSql->cmd;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
  SMeterMeta *    pMeterMeta = pMeterMetaInfo->pMeterMeta;
S
slguan 已提交
655 656 657

  STableDataBlocks *dataBuf =
      tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
S
slguan 已提交
658
                              sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name);
S
slguan 已提交
659

H
hzcheng 已提交
660
  int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize);
S
slguan 已提交
661 662 663
  if (0 == maxNumOfRows) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
L
[1292]  
lihui 已提交
664 665 666 667 668 669 670 671 672
  
  int32_t code = TSDB_CODE_INVALID_SQL;
  char*   tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
  if (NULL == tmpTokenBuf) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
  
  int32_t numOfRows = tsParseValues(str, dataBuf, pMeterMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
  free(tmpTokenBuf);
H
hzcheng 已提交
673
  if (numOfRows <= 0) {
L
[1292]  
lihui 已提交
674
    return code;
H
hzcheng 已提交
675 676
  }

S
slguan 已提交
677 678 679 680 681 682 683 684
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
    SParamInfo* param = dataBuf->params + i;
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
      param->offset -= sizeof(SShellSubmitBlock);
    }
  }

S
slguan 已提交
685
  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData);
H
hzcheng 已提交
686
  tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
S
slguan 已提交
687 688 689

  dataBuf->vgid = pMeterMeta->vgid;
  dataBuf->numOfMeters = 1;
H
hzcheng 已提交
690 691

  /*
S
slguan 已提交
692 693
   * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
   * which is actually returned from server.
H
hzcheng 已提交
694
   */
S
slguan 已提交
695
  *totalNum += numOfRows;
H
hzcheng 已提交
696 697 698
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
699
static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
S
slguan 已提交
700 701 702 703
  int32_t   index = 0;
  SSQLToken sToken;
  SSQLToken tableToken;
  int32_t   code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
704

S
slguan 已提交
705 706
  SSqlCmd *       pCmd = &pSql->cmd;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
H
hzcheng 已提交
707

S
slguan 已提交
708 709 710 711 712
  char *sql = *sqlstr;
  // get the token of specified table
  index = 0;
  tableToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;
H
hzcheng 已提交
713

S
slguan 已提交
714 715
  char *cstart = NULL;
  char *cend = NULL;
H
hzcheng 已提交
716

S
slguan 已提交
717 718 719 720 721
  // skip possibly exists column list
  index = 0;
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;

H
hzcheng 已提交
722
  int32_t numOfColList = 0;
S
slguan 已提交
723
  bool    createTable = false;
H
hzcheng 已提交
724

S
slguan 已提交
725 726 727
  if (sToken.type == TK_LP) {
    cstart = &sToken.z[0];
    index = 0;
H
hzcheng 已提交
728
    while (1) {
S
slguan 已提交
729 730 731
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      if (sToken.type == TK_RP) {
        cend = &sToken.z[0];
H
hzcheng 已提交
732 733 734 735 736 737
        break;
      }

      ++numOfColList;
    }

S
slguan 已提交
738 739
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
H
hzcheng 已提交
740 741 742 743 744 745
  }

  if (numOfColList == 0 && cstart != NULL) {
    return TSDB_CODE_INVALID_SQL;
  }

H
hjxilinx 已提交
746
  if (sToken.type == TK_USING) { // create table if not exists
S
slguan 已提交
747 748 749 750
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;

S
slguan 已提交
751
    STagData *pTag = (STagData *)pCmd->payload;
S
slguan 已提交
752
    memset(pTag, 0, sizeof(STagData));
S
slguan 已提交
753
    setMeterID(pSql, &sToken, 0);
H
hzcheng 已提交
754

S
slguan 已提交
755 756
    strncpy(pTag->name, pMeterMetaInfo->name, TSDB_METER_ID_LEN);
    code = tscGetMeterMeta(pSql, pTag->name, 0);
H
hzcheng 已提交
757 758 759 760
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

S
slguan 已提交
761
    if (!UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
H
hjxilinx 已提交
762
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
763 764
    }

S
slguan 已提交
765
    SSchema *pTagSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
766

S
slguan 已提交
767 768 769
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
L
lihui 已提交
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826

    SParsedDataColInfo spd = {0};   
    uint8_t numOfTags = pMeterMetaInfo->pMeterMeta->numOfTags;
    spd.numOfCols = numOfTags;

    // if specify some tags column
    if (sToken.type != TK_LP) {
      tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
    } else {
      /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen) tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < numOfTags; ++t) {
        offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
      }

      while (1) {
        index = 0;
        sToken = tStrGetToken(sql, &index, false, 0, NULL);
        sql += index;

        if (TK_STRING == sToken.type) {
          sToken.n = strdequote(sToken.z);
          strtrim(sToken.z);
          sToken.n = (uint32_t)strlen(sToken.z);
        }

        if (sToken.type == TK_RP) {
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < numOfTags; ++t) {
          if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) {
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
            pElem->offset   = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
              return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z);
            }

            spd.hasVal[t]   = true;
            findColumnIndex = true;
            break;
          }
        }

        if (!findColumnIndex) {
          return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken.z);
        }
      }

      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > numOfTags) {
        return tscInvalidSQLErrMsg(pCmd->payload, "tag name expected", sToken.z);
      }
L
lihui 已提交
827 828 829 830

      index = 0;
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      sql += index;
L
lihui 已提交
831 832
    }
    
S
slguan 已提交
833
    if (sToken.type != TK_TAGS) {
L
lihui 已提交
834
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
H
hzcheng 已提交
835 836
    }

S
slguan 已提交
837 838
    uint32_t ignoreTokenTypes = TK_LP;
    uint32_t numOfIgnoreToken = 1;
L
lihui 已提交
839 840 841 842
    for (int i = 0; i < spd.numOfAssignedCols; ++i) {
      char*   tagVal   = pTag->data + spd.elems[i].offset;
      int16_t colIndex = spd.elems[i].colIndex;
    
S
slguan 已提交
843 844 845 846
      index = 0;
      sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
      sql += index;
      if (sToken.n == 0) {
H
hzcheng 已提交
847
        break;
S
slguan 已提交
848 849 850
      } else if (sToken.type == TK_RP) {
        break;
      }
H
hzcheng 已提交
851

S
slguan 已提交
852 853 854 855
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
856 857
      }

L
lihui 已提交
858
      code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, pMeterMetaInfo->pMeterMeta->precision);
H
hzcheng 已提交
859
      if (code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
860
        return code;
H
hzcheng 已提交
861 862
      }

L
lihui 已提交
863 864
      if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY ||
           pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) && sToken.n > pTagSchema[colIndex].bytes) {
H
hjxilinx 已提交
865
        return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z);
S
slguan 已提交
866
      }
L
lihui 已提交
867
    }
S
slguan 已提交
868

L
lihui 已提交
869 870 871 872 873
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
      return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
874 875
    }

L
lihui 已提交
876 877 878 879 880 881 882 883 884 885 886
    // 2. set the null value for the columns that do not assign values
    if (spd.numOfAssignedCols < spd.numOfCols) {
      char *ptr = pTag->data;
    
      for (int32_t i = 0; i < spd.numOfCols; ++i) {
        if (!spd.hasVal[i]) { // current tag column do not have any value to insert, set it to null
          setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
        }
    
        ptr += pTagSchema[i].bytes;
      }    
H
hzcheng 已提交
887 888 889
    }

    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
890
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
891 892
    }

S
slguan 已提交
893
    int32_t ret = setMeterID(pSql, &tableToken, 0);
H
hzcheng 已提交
894 895 896 897 898
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

    createTable = true;
S
slguan 已提交
899
    code = tscGetMeterMetaEx(pSql, pMeterMetaInfo->name, true);
H
hzcheng 已提交
900 901 902 903
  } else {
    if (cstart != NULL) {
      sql = cstart;
    } else {
S
slguan 已提交
904
      sql = sToken.z;
H
hzcheng 已提交
905
    }
S
slguan 已提交
906
    code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
H
hzcheng 已提交
907 908 909 910 911
  }

  int32_t len = cend - cstart + 1;
  if (cstart != NULL && createTable == true) {
    /* move the column list to start position of the next accessed points */
W
WangXin 已提交
912
    memmove(sql - len, cstart, len);
H
hzcheng 已提交
913 914 915 916 917 918 919 920
    *sqlstr = sql - len;
  } else {
    *sqlstr = sql;
  }

  return code;
}

S
slguan 已提交
921
int validateTableName(char *tblName, int len) {
H
huili 已提交
922
  char buf[TSDB_METER_ID_LEN] = {0};
S
slguan 已提交
923
  strncpy(buf, tblName, len);
S
slguan 已提交
924

S
slguan 已提交
925
  SSQLToken token = {.n = len, .type = TK_ID, .z = buf};
H
huili 已提交
926
  tSQLGetToken(buf, &token.type);
S
slguan 已提交
927

H
huili 已提交
928 929 930
  return tscValidateName(&token);
}

H
hzcheng 已提交
931 932 933 934 935 936 937 938 939
/**
 * usage: insert into table1 values() () table2 values()()
 *
 * @param str
 * @param acct
 * @param db
 * @param pSql
 * @return
 */
H
hjxilinx 已提交
940
int doParserInsertSql(SSqlObj *pSql, char *str) {
S
slguan 已提交
941
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
942 943
  
  int32_t code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
944
  int32_t totalNum = 0;
H
hzcheng 已提交
945

S
slguan 已提交
946
  SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd);
H
hzcheng 已提交
947

S
slguan 已提交
948
  if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
949 950 951
    return code;
  }

H
hjxilinx 已提交
952
  void *pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt);
H
hzcheng 已提交
953 954 955 956 957

  pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
  tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks);

  while (1) {
S
slguan 已提交
958 959 960
    int32_t index = 0;
    SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL);
    if (sToken.n == 0) { // parse file, do not release the STableDataBlock
S
slguan 已提交
961 962 963 964 965
      if (pCmd->isInsertFromFile == 1) {
        goto _clean;
      }

      if (totalNum > 0) {
H
hzcheng 已提交
966 967 968 969 970 971 972
        break;
      } else {  // no data in current sql string, error
        code = TSDB_CODE_INVALID_SQL;
        goto _error_clean;
      }
    }

S
slguan 已提交
973
    // Check if the table name available or not
S
slguan 已提交
974
    if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
975
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
H
huili 已提交
976 977 978
      goto _error_clean;
    }

H
hjxilinx 已提交
979
    //TODO refactor
S
slguan 已提交
980
    if ((code = setMeterID(pSql, &sToken, 0)) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
981 982 983
      goto _error_clean;
    }

S
slguan 已提交
984
    void *fp = pSql->fp;
H
hzcheng 已提交
985 986 987 988 989 990 991 992 993 994 995 996 997
    if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) {
      if (fp != NULL) {
        goto _clean;
      } else {
        /*
         * for async insert, the free data block operations, which is tscDestroyBlockArrayList,
         * must be executed before launch another threads to get metermeta, since the
         * later ops may manipulate SSqlObj through another thread in getMeterMetaCallback function.
         */
        goto _error_clean;
      }
    }

S
slguan 已提交
998
    if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
H
hjxilinx 已提交
999
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
H
hzcheng 已提交
1000 1001 1002
      goto _error_clean;
    }

S
slguan 已提交
1003 1004 1005 1006
    index = 0;
    sToken = tStrGetToken(str, &index, false, 0, NULL);
    str += index;
    if (sToken.n == 0) {
H
hjxilinx 已提交
1007
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
hzcheng 已提交
1008 1009 1010
      goto _error_clean;
    }

S
slguan 已提交
1011 1012 1013
    if (sToken.type == TK_VALUES) {
      SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns};
      SSchema *          pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
1014

S
slguan 已提交
1015
      tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns);
H
hzcheng 已提交
1016 1017 1018 1019 1020

      if (pCmd->isInsertFromFile == -1) {
        pCmd->isInsertFromFile = 0;
      } else {
        if (pCmd->isInsertFromFile == 1) {
H
hjxilinx 已提交
1021
          code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
H
hzcheng 已提交
1022 1023 1024 1025 1026 1027 1028 1029
          goto _error_clean;
        }
      }

      /*
       * app here insert data in different vnodes, so we need to set the following
       * data in another submit procedure using async insert routines
       */
S
slguan 已提交
1030
      code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum);
H
hzcheng 已提交
1031 1032 1033 1034
      if (code != TSDB_CODE_SUCCESS) {
        goto _error_clean;
      }

S
slguan 已提交
1035
    } else if (sToken.type == TK_FILE) {
H
hzcheng 已提交
1036 1037 1038 1039
      if (pCmd->isInsertFromFile == -1) {
        pCmd->isInsertFromFile = 1;
      } else {
        if (pCmd->isInsertFromFile == 0) {
H
hjxilinx 已提交
1040
          code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
H
hzcheng 已提交
1041 1042 1043 1044
          goto _error_clean;
        }
      }

S
slguan 已提交
1045 1046 1047 1048
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1049
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
H
hzcheng 已提交
1050 1051 1052
        goto _error_clean;
      }

S
slguan 已提交
1053
      char fname[PATH_MAX] = {0};
S
slguan 已提交
1054
      strncpy(fname, sToken.z, sToken.n);
S
slguan 已提交
1055
      strdequote(fname);
1056

H
hzcheng 已提交
1057 1058
      wordexp_t full_path;
      if (wordexp(fname, &full_path, 0) != 0) {
H
hjxilinx 已提交
1059
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
H
hzcheng 已提交
1060 1061 1062 1063 1064
        goto _error_clean;
      }
      strcpy(fname, full_path.we_wordv[0]);
      wordfree(&full_path);

H
hjxilinx 已提交
1065
      STableDataBlocks *pDataBlock = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize,
S
slguan 已提交
1066
                                                          sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
H
hzcheng 已提交
1067

S
slguan 已提交
1068 1069
      tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
      strcpy(pDataBlock->filename, fname);
S
slguan 已提交
1070
    } else if (sToken.type == TK_LP) {
H
hzcheng 已提交
1071
      /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
S
slguan 已提交
1072
      SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0)->pMeterMeta;
S
slguan 已提交
1073
      SSchema *   pSchema = tsGetSchema(pMeterMeta);
H
hzcheng 已提交
1074 1075 1076 1077

      if (pCmd->isInsertFromFile == -1) {
        pCmd->isInsertFromFile = 0;
      } else if (pCmd->isInsertFromFile == 1) {
H
hjxilinx 已提交
1078
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
H
hzcheng 已提交
1079 1080 1081
        goto _error_clean;
      }

1082 1083
      SParsedDataColInfo spd = {0};
      spd.numOfCols = pMeterMeta->numOfColumns;
H
hzcheng 已提交
1084 1085 1086 1087 1088 1089 1090

      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < pMeterMeta->numOfColumns; ++t) {
        offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
      }

      while (1) {
S
slguan 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
        index = 0;
        sToken = tStrGetToken(str, &index, false, 0, NULL);
        str += index;

        if (TK_STRING == sToken.type) {
          sToken.n = strdequote(sToken.z);
          strtrim(sToken.z);
          sToken.n = (uint32_t)strlen(sToken.z);
        }

        if (sToken.type == TK_RP) {
H
hzcheng 已提交
1102 1103 1104 1105 1106 1107 1108
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < pMeterMeta->numOfColumns; ++t) {
S
slguan 已提交
1109
          if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
S
slguan 已提交
1110
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
H
hzcheng 已提交
1111 1112 1113 1114
            pElem->offset = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
H
hjxilinx 已提交
1115
              code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
H
hzcheng 已提交
1116 1117 1118 1119 1120 1121 1122 1123 1124
              goto _error_clean;
            }

            spd.hasVal[t] = true;
            findColumnIndex = true;
            break;
          }
        }

S
slguan 已提交
1125
        if (!findColumnIndex) {
H
hjxilinx 已提交
1126
          code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
H
hzcheng 已提交
1127 1128 1129 1130
          goto _error_clean;
        }
      }

1131
      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > pMeterMeta->numOfColumns) {
H
hjxilinx 已提交
1132
        code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
H
hzcheng 已提交
1133 1134 1135
        goto _error_clean;
      }

S
slguan 已提交
1136 1137 1138 1139 1140
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;

      if (sToken.type != TK_VALUES) {
H
hjxilinx 已提交
1141
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
H
hzcheng 已提交
1142 1143 1144
        goto _error_clean;
      }

S
slguan 已提交
1145
      code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum);
H
hzcheng 已提交
1146 1147 1148 1149
      if (code != TSDB_CODE_SUCCESS) {
        goto _error_clean;
      }
    } else {
H
hjxilinx 已提交
1150
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
hzcheng 已提交
1151 1152 1153 1154
      goto _error_clean;
    }
  }

S
slguan 已提交
1155 1156 1157 1158 1159
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }

S
slguan 已提交
1160
  // submit to more than one vnode
H
hzcheng 已提交
1161
  if (pCmd->pDataBlocks->nSize > 0) {
S
slguan 已提交
1162
    // merge according to vgid
S
slguan 已提交
1163 1164 1165
    if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
      goto _error_clean;
    }
H
hzcheng 已提交
1166

S
slguan 已提交
1167 1168 1169
    STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
    if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
      goto _error_clean;
H
hzcheng 已提交
1170 1171
    }

H
hjxilinx 已提交
1172 1173
    SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
    
S
slguan 已提交
1174
    // set the next sent data vnode index in data block arraylist
H
hjxilinx 已提交
1175
    pMeterMetaInfo->vnodeIndex = 1;
H
hzcheng 已提交
1176
  } else {
S
slguan 已提交
1177
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1178 1179 1180 1181 1182 1183
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

_error_clean:
S
slguan 已提交
1184
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1185 1186

_clean:
S
slguan 已提交
1187
  taosCleanUpIntHash(pTableHashList);
H
hzcheng 已提交
1188 1189 1190
  return code;
}

S
slguan 已提交
1191
int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
S
slguan 已提交
1192 1193 1194
  if (!pSql->pTscObj->writeAuth) {
    return TSDB_CODE_NO_RIGHTS;
  }
H
hzcheng 已提交
1195

H
hjxilinx 已提交
1196
  int32_t  index = 0;
S
slguan 已提交
1197
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1198

S
slguan 已提交
1199
  SSQLToken sToken = tStrGetToken(sql, &index, false, 0, NULL);
H
hjxilinx 已提交
1200 1201 1202 1203
  
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
  pCmd->import = (sToken.type == TK_IMPORT);
  
S
slguan 已提交
1204 1205
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1206
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1207
  }
H
hjxilinx 已提交
1208 1209 1210 1211 1212 1213 1214
  
  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;
  pCmd->isInsertFromFile = -1;
  pSql->res.numOfRows = 0;
  
  return doParserInsertSql(pSql, sql + index);
H
hzcheng 已提交
1215 1216
}

S
slguan 已提交
1217
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
H
hzcheng 已提交
1218
  int32_t ret = TSDB_CODE_SUCCESS;
S
slguan 已提交
1219 1220 1221

  // must before clean the sqlcmd object
  tscRemoveAllMeterMetaInfo(&pSql->cmd, false);
S
slguan 已提交
1222
  tscCleanSqlCmd(&pSql->cmd);
H
hzcheng 已提交
1223 1224 1225

  if (tscIsInsertOrImportData(pSql->sqlstr)) {
    /*
S
slguan 已提交
1226 1227 1228
     * only for async multi-vnode insertion
     * Set the fp before parse the sql string, in case of getmetermeta failed, in which
     * the error handle callback function can rightfully restore the user defined function (fp)
H
hzcheng 已提交
1229 1230 1231 1232 1233
     */
    if (pSql->fp != NULL && multiVnodeInsertion) {
      assert(pSql->fetchFp == NULL);
      pSql->fetchFp = pSql->fp;

S
slguan 已提交
1234
      // replace user defined callback function with multi-insert proxy function
H
hzcheng 已提交
1235 1236 1237 1238 1239
      pSql->fp = tscAsyncInsertMultiVnodesProxy;
    }

    ret = tsParseInsertSql(pSql, pSql->sqlstr, acct, db);
  } else {
S
slguan 已提交
1240 1241 1242
    ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
    if (TSDB_CODE_SUCCESS != ret) return ret;
    
H
hzcheng 已提交
1243 1244
    SSqlInfo SQLInfo = {0};
    tSQLParse(&SQLInfo, pSql->sqlstr);
S
slguan 已提交
1245

H
hzcheng 已提交
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
    ret = tscToSQLCmd(pSql, &SQLInfo);
    SQLInfoDestroy(&SQLInfo);
  }

  /*
   * the pRes->code may be modified or even released by another thread in tscMeterMetaCallBack
   * function, so do NOT use pRes->code to determine if the getMeterMeta/getMetricMeta function
   * invokes new threads to get data from mnode or simply retrieves data from cache.
   *
   * do NOT assign return code to pRes->code for the same reason for it may be released by another thread
   * pRes->code = ret;
   */
  return ret;
}

S
slguan 已提交
1261 1262 1263
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1264

S
slguan 已提交
1265
  SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0)->pMeterMeta;
S
slguan 已提交
1266 1267 1268 1269

  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
  tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);

S
slguan 已提交
1270 1271 1272
  if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
    return code;
  }
S
slguan 已提交
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286

  // the pDataBlock is different from the pTableDataBlocks
  STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

  if ((code = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
    return code;
  }

  return TSDB_CODE_SUCCESS;
}

L
[1292]  
lihui 已提交
1287
static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
S
slguan 已提交
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
  size_t          readLen = 0;
  char *          line = NULL;
  size_t          n = 0;
  int             len = 0;
  uint32_t        maxRows = 0;
  SSqlCmd *       pCmd = &pSql->cmd;
  int             numOfRows = 0;
  int32_t         code = 0;
  int             nrows = 0;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
  SMeterMeta *    pMeterMeta = pMeterMetaInfo->pMeterMeta;
  int32_t         rowSize = pMeterMeta->rowSize;
S
slguan 已提交
1300 1301

  pCmd->pDataBlocks = tscCreateBlockArrayList();
H
hjxilinx 已提交
1302 1303
  STableDataBlocks *pTableDataBlock = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize,
      sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
S
slguan 已提交
1304 1305 1306 1307

  tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);

  maxRows = tscAllocateMemIfNeed(pTableDataBlock, rowSize);
H
hzcheng 已提交
1308 1309 1310
  if (maxRows < 1) return -1;

  int                count = 0;
S
slguan 已提交
1311 1312
  SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns};
  SSchema *          pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
1313

S
slguan 已提交
1314
  tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns);
H
hzcheng 已提交
1315

H
huili 已提交
1316
  while ((readLen = getline(&line, &n, fp)) != -1) {
H
hzcheng 已提交
1317 1318
    // line[--readLen] = '\0';
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0;
F
fang 已提交
1319
    if (readLen == 0) continue; //fang, <= to ==
H
huili 已提交
1320

S
slguan 已提交
1321
    char *lineptr = line;
H
hzcheng 已提交
1322
    strtolower(line, line);
S
slguan 已提交
1323

H
hjxilinx 已提交
1324
    if (numOfRows >= maxRows || pTableDataBlock->size + pMeterMeta->rowSize >= pTableDataBlock->nAllocSize) {
S
slguan 已提交
1325 1326 1327
      uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, pMeterMeta->rowSize);
      if (0 == tSize) return (-TSDB_CODE_CLI_OUT_OF_MEMORY);
      maxRows += tSize;    
H
hjxilinx 已提交
1328 1329
    }

L
[1292]  
lihui 已提交
1330
    len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code, tmpTokenBuf);
S
slguan 已提交
1331
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
L
[1292]  
lihui 已提交
1332 1333
      pSql->res.code = code;
      return (-code);
S
slguan 已提交
1334
    }
H
hjxilinx 已提交
1335
    
S
slguan 已提交
1336
    pTableDataBlock->size += len;
H
hzcheng 已提交
1337 1338 1339 1340

    count++;
    nrows++;
    if (count >= maxRows) {
S
slguan 已提交
1341 1342
      if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
        return -code;
H
hzcheng 已提交
1343
      }
S
slguan 已提交
1344 1345 1346 1347 1348

      pTableDataBlock = pCmd->pDataBlocks->pData[0];
      pTableDataBlock->size = sizeof(SShellSubmitBlock);
      pTableDataBlock->rowSize = pMeterMeta->rowSize;

H
hzcheng 已提交
1349
      numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1350
      pSql->res.numOfRows = 0;
H
hzcheng 已提交
1351 1352 1353 1354 1355
      count = 0;
    }
  }

  if (count > 0) {
S
slguan 已提交
1356 1357
    if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
      return -code;
H
hzcheng 已提交
1358
    }
S
slguan 已提交
1359

H
hzcheng 已提交
1360
    numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1361
    pSql->res.numOfRows = 0;
H
hzcheng 已提交
1362 1363 1364
  }

  if (line) tfree(line);
S
slguan 已提交
1365

H
hzcheng 已提交
1366 1367 1368 1369 1370 1371 1372 1373 1374
  return numOfRows;
}

/* multi-vnodes insertion in sync query model
 *
 * modify history
 * 2019.05.10 lihui
 * Remove the code for importing records from files
 */
S
slguan 已提交
1375 1376
void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1377 1378

  // not insert/import, return directly
H
hzcheng 已提交
1379 1380 1381 1382
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

S
slguan 已提交
1383 1384 1385 1386 1387
  // SSqlCmd may have been released
  if (pCmd->pDataBlocks == NULL) {
    return;
  }

S
slguan 已提交
1388
  STableDataBlocks *pDataBlock = NULL;
S
slguan 已提交
1389
  SMeterMetaInfo *  pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
S
slguan 已提交
1390
  int32_t           code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1391 1392

  /* the first block has been sent to server in processSQL function */
H
hjxilinx 已提交
1393
  assert(pCmd->isInsertFromFile != -1 && pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL);
H
hzcheng 已提交
1394

H
hjxilinx 已提交
1395
  if (pMeterMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) {
S
slguan 已提交
1396
    SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
H
hzcheng 已提交
1397

H
hjxilinx 已提交
1398
    for (int32_t i = pMeterMetaInfo->vnodeIndex; i < pDataBlocks->nSize; ++i) {
H
hzcheng 已提交
1399 1400 1401 1402 1403 1404
      pDataBlock = pDataBlocks->pData[i];
      if (pDataBlock == NULL) {
        continue;
      }

      if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1405
        tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, pDataBlocks->nSize);
H
hzcheng 已提交
1406 1407 1408 1409 1410 1411 1412 1413
        continue;
      }

      tscProcessSql(pSql);
    }
  }

  // all data have been submit to vnode, release data blocks
S
slguan 已提交
1414
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1415 1416
}

S
slguan 已提交
1417
// multi-vnodes insertion in sync query model
S
slguan 已提交
1418 1419
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1420 1421 1422 1423
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

S
slguan 已提交
1424
  SMeterMetaInfo *  pInfo = tscGetMeterMetaInfo(pCmd, 0);
S
slguan 已提交
1425 1426
  STableDataBlocks *pDataBlock = NULL;
  int32_t           affected_rows = 0;
H
hzcheng 已提交
1427

S
slguan 已提交
1428 1429 1430
  assert(pCmd->isInsertFromFile == 1 && pCmd->pDataBlocks != NULL);
  SDataBlockList *pDataBlockList = pCmd->pDataBlocks;
  pCmd->pDataBlocks = NULL;
H
hzcheng 已提交
1431

S
slguan 已提交
1432
  char path[PATH_MAX] = {0};
H
hzcheng 已提交
1433

S
slguan 已提交
1434 1435
  for (int32_t i = 0; i < pDataBlockList->nSize; ++i) {
    pDataBlock = pDataBlockList->pData[i];
H
hzcheng 已提交
1436 1437 1438
    if (pDataBlock == NULL) {
      continue;
    }
S
slguan 已提交
1439 1440 1441 1442 1443
    
    if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) {
      tscError("%p failed to malloc when insert file", pSql);
      continue;
    }
H
hzcheng 已提交
1444 1445
    pCmd->count = 1;

S
slguan 已提交
1446 1447 1448
    strncpy(path, pDataBlock->filename, PATH_MAX);

    FILE *fp = fopen(path, "r");
H
hzcheng 已提交
1449
    if (fp == NULL) {
S
slguan 已提交
1450
      tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno));
H
hzcheng 已提交
1451 1452 1453
      continue;
    }

S
slguan 已提交
1454
    strncpy(pInfo->name, pDataBlock->meterId, TSDB_METER_ID_LEN);
S
slguan 已提交
1455 1456
    memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);

S
slguan 已提交
1457
    int32_t ret = tscGetMeterMeta(pSql, pInfo->name, 0);
S
slguan 已提交
1458 1459 1460 1461
    if (ret != TSDB_CODE_SUCCESS) {
      tscError("%p get meter meta failed, abort", pSql);
      continue;
    }
L
[1292]  
lihui 已提交
1462 1463 1464 1465 1466 1467
    
    char*   tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
    if (NULL == tmpTokenBuf) {
      tscError("%p calloc failed", pSql);
      continue;
    }
S
slguan 已提交
1468

L
[1292]  
lihui 已提交
1469 1470 1471
    int nrows = tscInsertDataFromFile(pSql, fp, tmpTokenBuf);
    free(tmpTokenBuf);
    
S
slguan 已提交
1472 1473
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
hzcheng 已提交
1474 1475
    if (nrows < 0) {
      fclose(fp);
S
slguan 已提交
1476
      tscTrace("%p no records(%d) in file %s", pSql, nrows, path);
H
hzcheng 已提交
1477 1478 1479
      continue;
    }

S
slguan 已提交
1480
    fclose(fp);
H
hzcheng 已提交
1481 1482
    affected_rows += nrows;

S
slguan 已提交
1483
    tscTrace("%p Insert data %d records from file %s", pSql, nrows, path);
H
hzcheng 已提交
1484 1485 1486 1487 1488
  }

  pSql->res.numOfRows = affected_rows;

  // all data have been submit to vnode, release data blocks
S
slguan 已提交
1489 1490
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
  tscDestroyBlockArrayList(pDataBlockList);
H
hzcheng 已提交
1491
}