clientSml.c 65.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wmmhello's avatar
wmmhello 已提交
16 17 18 19 20
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

21 22 23 24 25 26
#include "clientSml.h"

int64_t smlToMilli[3] = {3600000LL, 60000LL, 1000LL};
int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};

X
Xiaoyu Wang 已提交
27
void *nodeListGet(NodeList *list, const void *key, int32_t len, _equal_fn_sml fn) {
wmmhello's avatar
wmmhello 已提交
28
  NodeList *tmp = list;
X
Xiaoyu Wang 已提交
29 30 31
  while (tmp) {
    if (fn == NULL) {
      if (tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
32 33
        return tmp->data.value;
      }
X
Xiaoyu Wang 已提交
34 35
    } else {
      if (tmp->data.used && fn(tmp->data.key, key) == 0) {
36 37
        return tmp->data.value;
      }
wmmhello's avatar
wmmhello 已提交
38
    }
39

wmmhello's avatar
wmmhello 已提交
40 41 42 43 44
    tmp = tmp->next;
  }
  return NULL;
}

X
Xiaoyu Wang 已提交
45
int nodeListSet(NodeList **list, const void *key, int32_t len, void *value, _equal_fn_sml fn) {
wmmhello's avatar
wmmhello 已提交
46
  NodeList *tmp = *list;
X
Xiaoyu Wang 已提交
47 48 49 50
  while (tmp) {
    if (!tmp->data.used) break;
    if (fn == NULL) {
      if (tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
51 52
        return -1;
      }
X
Xiaoyu Wang 已提交
53 54
    } else {
      if (tmp->data.keyLen == len && fn(tmp->data.key, key) == 0) {
55 56
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
57
    }
58

wmmhello's avatar
wmmhello 已提交
59 60
    tmp = tmp->next;
  }
X
Xiaoyu Wang 已提交
61
  if (tmp) {
wmmhello's avatar
wmmhello 已提交
62 63 64 65
    tmp->data.key = key;
    tmp->data.keyLen = len;
    tmp->data.value = value;
    tmp->data.used = true;
X
Xiaoyu Wang 已提交
66
  } else {
67
    NodeList *newNode = (NodeList *)taosMemoryCalloc(1, sizeof(NodeList));
X
Xiaoyu Wang 已提交
68
    if (newNode == NULL) {
wmmhello's avatar
wmmhello 已提交
69 70 71 72 73 74 75 76 77 78 79 80
      return -1;
    }
    newNode->data.key = key;
    newNode->data.keyLen = len;
    newNode->data.value = value;
    newNode->data.used = true;
    newNode->next = *list;
    *list = newNode;
  }
  return 0;
}

X
Xiaoyu Wang 已提交
81
int nodeListSize(NodeList *list) {
wmmhello's avatar
wmmhello 已提交
82
  int cnt = 0;
X
Xiaoyu Wang 已提交
83 84 85 86 87
  while (list) {
    if (list->data.used)
      cnt++;
    else
      break;
wmmhello's avatar
wmmhello 已提交
88 89 90 91
    list = list->next;
  }
  return cnt;
}
wmmhello's avatar
wmmhello 已提交
92

93
inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
94
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
95 96 97
  return false;
}

98
int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
99
  if (pBuf->buf) {
100 101 102 103 104 105 106
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
107
  }
wmmhello's avatar
wmmhello 已提交
108 109 110
  return TSDB_CODE_SML_INVALID_DATA;
}

111 112 113 114 115
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
  char   *endPtr = NULL;
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
  if (unlikely(value + len != endPtr)) {
    return -1;
116
  }
wmmhello's avatar
wmmhello 已提交
117

X
Xiaoyu Wang 已提交
118
  if (unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)) {
119
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
D
dapan1121 已提交
120
    if (tsInt64 != 0 && unit > INT64_MAX / tsInt64) {
121 122 123 124
      return -1;
    }
    tsInt64 *= unit;
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
wmmhello's avatar
wmmhello 已提交
125
  }
wmmhello's avatar
wmmhello 已提交
126

127
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
wmmhello's avatar
wmmhello 已提交
128 129
}

130 131 132 133 134 135 136
int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
    return TSDB_TIME_PRECISION_MILLI;
  } else {
    return -1;
137 138 139
  }
}

X
Xiaoyu Wang 已提交
140
SSmlTableInfo *smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen) {
141 142 143
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
144 145
  }

146 147
  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;
148

149 150 151 152
  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
153
  }
154

X
Xiaoyu Wang 已提交
155 156 157 158 159
  //  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
  //  if (tag->tags == NULL) {
  //    uError("SML:smlBuildTableInfo failed to allocate memory");
  //    goto cleanup;
  //  }
160 161
  return tag;

X
Xiaoyu Wang 已提交
162
cleanup:
163 164
  taosMemoryFree(tag);
  return NULL;
165 166
}

167
static int32_t smlParseTableName(SArray *tags, char *childTableName) {
X
Xiaoyu Wang 已提交
168
  size_t childTableNameLen = strlen(tsSmlChildTableName);
169 170
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
171
  for (int i = 0; i < taosArrayGetSize(tags); i++) {
172 173 174 175 176
    SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
wmmhello's avatar
wmmhello 已提交
177
      taosArrayRemove(tags, i);
178
      break;
wmmhello's avatar
wmmhello 已提交
179 180
    }
  }
181

wmmhello's avatar
wmmhello 已提交
182 183 184
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
185
int32_t smlSetCTableName(SSmlTableInfo *oneTable) {
186
  smlParseTableName(oneTable->tags, oneTable->childTableName);
187

188
  if (strlen(oneTable->childTableName) == 0) {
X
Xiaoyu Wang 已提交
189
    SArray       *dst = taosArrayDup(oneTable->tags, NULL);
wmmhello's avatar
wmmhello 已提交
190
    RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
wmmhello's avatar
wmmhello 已提交
191

192 193
    buildChildTableName(&rName);
    taosArrayDestroy(dst);
194
  }
195 196
  return TSDB_CODE_SUCCESS;
}
197

198 199 200 201 202 203 204 205 206 207 208 209 210 211
void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo){
  char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
  size_t nLen = strlen(tinfo->childTableName);
  memcpy(key, currElement->measure, currElement->measureLen);
  memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
  void *uid = taosHashGet(info->tableUids, key, currElement->measureLen + 1 + nLen);    // use \0 as separator for stable name and child table name
  if (uid == NULL) {
    tinfo->uid = info->uid++;
    taosHashPut(info->tableUids, key, currElement->measureLen + 1 + nLen, &tinfo->uid, sizeof(uint64_t));
  }else{
    tinfo->uid = *(uint64_t*)uid;
  }
}

212 213 214 215
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
216 217
  }

X
Xiaoyu Wang 已提交
218
  if (unlikely(!isDataFormat)) {
219 220 221 222 223
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
224

225 226 227 228 229
    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
230 231
  }

232 233 234 235
  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
236 237
  }

238 239 240 241 242 243
  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;
244

X
Xiaoyu Wang 已提交
245
cleanup:
246 247
  taosMemoryFree(meta);
  return NULL;
wmmhello's avatar
wmmhello 已提交
248 249
}

X
Xiaoyu Wang 已提交
250 251 252 253 254 255 256
// uint16_t smlCalTypeSum(char* endptr, int32_t left){
//   uint16_t sum = 0;
//   for(int i = 0; i < left; i++){
//     sum += endptr[i];
//   }
//   return sum;
// }
wmmhello's avatar
wmmhello 已提交
257

X
Xiaoyu Wang 已提交
258 259 260
#define RETURN_FALSE                                 \
  smlBuildInvalidDataMsg(msg, "invalid data", pVal); \
  return false;
wmmhello's avatar
wmmhello 已提交
261

X
Xiaoyu Wang 已提交
262 263 264 265 266 267 268 269 270 271
#define SET_DOUBLE                     \
  kvVal->type = TSDB_DATA_TYPE_DOUBLE; \
  kvVal->d = result;

#define SET_FLOAT                                                                              \
  if (!IS_VALID_FLOAT(result)) {                                                               \
    smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal); \
    return false;                                                                              \
  }                                                                                            \
  kvVal->type = TSDB_DATA_TYPE_FLOAT;                                                          \
wmmhello's avatar
wmmhello 已提交
272 273
  kvVal->f = (float)result;

X
Xiaoyu Wang 已提交
274
#define SET_BIGINT                                                                                       \
wmmhello's avatar
wmmhello 已提交
275 276 277 278 279 280 281 282
  errno = 0;                                                                                             \
  int64_t tmp = taosStr2Int64(pVal, &endptr, 10);                                                        \
  if (errno == ERANGE) {                                                                                 \
    smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal); \
    return false;                                                                                        \
  }                                                                                                      \
  kvVal->type = TSDB_DATA_TYPE_BIGINT;                                                                   \
  kvVal->i = tmp;
wmmhello's avatar
wmmhello 已提交
283

X
Xiaoyu Wang 已提交
284 285 286 287 288 289
#define SET_INT                                                                    \
  if (!IS_VALID_INT(result)) {                                                     \
    smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal); \
    return false;                                                                  \
  }                                                                                \
  kvVal->type = TSDB_DATA_TYPE_INT;                                                \
wmmhello's avatar
wmmhello 已提交
290 291
  kvVal->i = result;

X
Xiaoyu Wang 已提交
292 293 294 295 296 297
#define SET_SMALL_INT                                                          \
  if (!IS_VALID_SMALLINT(result)) {                                            \
    smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal); \
    return false;                                                              \
  }                                                                            \
  kvVal->type = TSDB_DATA_TYPE_SMALLINT;                                       \
wmmhello's avatar
wmmhello 已提交
298 299
  kvVal->i = result;

X
Xiaoyu Wang 已提交
300
#define SET_UBIGINT                                                                             \
wmmhello's avatar
wmmhello 已提交
301 302 303 304 305 306 307 308
  errno = 0;                                                                                    \
  uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);                                             \
  if (errno == ERANGE || result < 0) {                                                          \
    smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal); \
    return false;                                                                               \
  }                                                                                             \
  kvVal->type = TSDB_DATA_TYPE_UBIGINT;                                                         \
  kvVal->u = tmp;
wmmhello's avatar
wmmhello 已提交
309

X
Xiaoyu Wang 已提交
310 311 312 313 314 315
#define SET_UINT                                                                  \
  if (!IS_VALID_UINT(result)) {                                                   \
    smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal); \
    return false;                                                                 \
  }                                                                               \
  kvVal->type = TSDB_DATA_TYPE_UINT;                                              \
wmmhello's avatar
wmmhello 已提交
316 317
  kvVal->u = result;

X
Xiaoyu Wang 已提交
318 319 320 321 322 323
#define SET_USMALL_INT                                                            \
  if (!IS_VALID_USMALLINT(result)) {                                              \
    smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal); \
    return false;                                                                 \
  }                                                                               \
  kvVal->type = TSDB_DATA_TYPE_USMALLINT;                                         \
wmmhello's avatar
wmmhello 已提交
324 325
  kvVal->u = result;

X
Xiaoyu Wang 已提交
326 327 328 329 330 331
#define SET_TINYINT                                                       \
  if (!IS_VALID_TINYINT(result)) {                                        \
    smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal); \
    return false;                                                         \
  }                                                                       \
  kvVal->type = TSDB_DATA_TYPE_TINYINT;                                   \
wmmhello's avatar
wmmhello 已提交
332 333
  kvVal->i = result;

X
Xiaoyu Wang 已提交
334 335 336 337 338 339
#define SET_UTINYINT                                                            \
  if (!IS_VALID_UTINYINT(result)) {                                             \
    smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal); \
    return false;                                                               \
  }                                                                             \
  kvVal->type = TSDB_DATA_TYPE_UTINYINT;                                        \
wmmhello's avatar
wmmhello 已提交
340 341
  kvVal->u = result;

342
bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
343 344
  const char *pVal = kvVal->value;
  int32_t     len = kvVal->length;
X
Xiaoyu Wang 已提交
345
  char       *endptr = NULL;
wmmhello's avatar
wmmhello 已提交
346 347 348 349 350 351 352 353 354
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
    RETURN_FALSE
  }

  int32_t left = len - (endptr - pVal);
  if (left == 0) {
    SET_DOUBLE
  } else if (left == 3) {
X
Xiaoyu Wang 已提交
355 356
    if (endptr[0] == 'f' || endptr[0] == 'F') {
      if (endptr[1] == '6' && endptr[2] == '4') {
wmmhello's avatar
wmmhello 已提交
357
        SET_DOUBLE
X
Xiaoyu Wang 已提交
358
      } else if (endptr[1] == '3' && endptr[2] == '2') {
wmmhello's avatar
wmmhello 已提交
359
        SET_FLOAT
X
Xiaoyu Wang 已提交
360
      } else {
wmmhello's avatar
wmmhello 已提交
361 362
        RETURN_FALSE
      }
X
Xiaoyu Wang 已提交
363 364
    } else if (endptr[0] == 'i' || endptr[0] == 'I') {
      if (endptr[1] == '6' && endptr[2] == '4') {
wmmhello's avatar
wmmhello 已提交
365
        SET_BIGINT
X
Xiaoyu Wang 已提交
366
      } else if (endptr[1] == '3' && endptr[2] == '2') {
wmmhello's avatar
wmmhello 已提交
367
        SET_INT
X
Xiaoyu Wang 已提交
368
      } else if (endptr[1] == '1' && endptr[2] == '6') {
wmmhello's avatar
wmmhello 已提交
369
        SET_SMALL_INT
X
Xiaoyu Wang 已提交
370
      } else {
wmmhello's avatar
wmmhello 已提交
371 372
        RETURN_FALSE
      }
X
Xiaoyu Wang 已提交
373 374
    } else if (endptr[0] == 'u' || endptr[0] == 'U') {
      if (endptr[1] == '6' && endptr[2] == '4') {
wmmhello's avatar
wmmhello 已提交
375
        SET_UBIGINT
X
Xiaoyu Wang 已提交
376
      } else if (endptr[1] == '3' && endptr[2] == '2') {
wmmhello's avatar
wmmhello 已提交
377
        SET_UINT
X
Xiaoyu Wang 已提交
378
      } else if (endptr[1] == '1' && endptr[2] == '6') {
wmmhello's avatar
wmmhello 已提交
379
        SET_USMALL_INT
X
Xiaoyu Wang 已提交
380
      } else {
wmmhello's avatar
wmmhello 已提交
381 382
        RETURN_FALSE
      }
wmmhello's avatar
wmmhello 已提交
383
    } else {
wmmhello's avatar
wmmhello 已提交
384 385
      RETURN_FALSE
    }
X
Xiaoyu Wang 已提交
386 387 388
  } else if (left == 2) {
    if (endptr[0] == 'i' || endptr[0] == 'I') {
      if (endptr[1] == '8') {
wmmhello's avatar
wmmhello 已提交
389
        SET_TINYINT
X
Xiaoyu Wang 已提交
390
      } else {
wmmhello's avatar
wmmhello 已提交
391 392
        RETURN_FALSE
      }
X
Xiaoyu Wang 已提交
393
    } else if (endptr[0] == 'u' || endptr[0] == 'U') {
wmmhello's avatar
wmmhello 已提交
394 395 396 397 398
      if (endptr[1] == '8') {
        SET_UTINYINT
      } else {
        RETURN_FALSE
      }
X
Xiaoyu Wang 已提交
399
    } else {
wmmhello's avatar
wmmhello 已提交
400 401
      RETURN_FALSE
    }
X
Xiaoyu Wang 已提交
402 403
  } else if (left == 1) {
    if (endptr[0] == 'i' || endptr[0] == 'I') {
wmmhello's avatar
wmmhello 已提交
404
      SET_BIGINT
X
Xiaoyu Wang 已提交
405
    } else if (endptr[0] == 'u' || endptr[0] == 'U') {
wmmhello's avatar
wmmhello 已提交
406
      SET_UBIGINT
X
Xiaoyu Wang 已提交
407
    } else {
wmmhello's avatar
wmmhello 已提交
408 409 410 411 412 413 414 415 416
      RETURN_FALSE
    }
  } else {
    RETURN_FALSE;
  }
  return true;
}

bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
417
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
418 419 420 421
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
422
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
423 424 425
    return false;
  }

426
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
427
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
428 429
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
430 431
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
432 433
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
434
    }
435 436
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
437 438
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
439 440
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
441
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
442 443 444 445 446 447
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
448
    }
449
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
450
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
451
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
452
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
453 454
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
455
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
456 457 458 459 460 461
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
462
    }
463
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
464
    kvVal->u = result;
X
Xiaoyu Wang 已提交
465 466
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
467 468
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
469
    }
470 471
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
472 473
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
474 475
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
476
    }
477 478
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
479 480
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
481 482
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
483
    }
484 485
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
486 487
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
488 489
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
490
    }
491 492
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
493 494
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
495 496
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
497
    }
498 499
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
500 501
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
502 503
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
504
    }
505 506
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
507
  } else {
508
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
509 510
    return false;
  }
511
  return true;
wmmhello's avatar
wmmhello 已提交
512 513
}

X
Xiaoyu Wang 已提交
514
STableMeta *smlGetMeta(SSmlHandle *info, const void *measure, int32_t measureLen) {
515
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
516

517 518
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
519

520 521 522 523 524 525 526
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);
wmmhello's avatar
wmmhello 已提交
527

528
  int32_t code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
X
Xiaoyu Wang 已提交
529
  if (code != TSDB_CODE_SUCCESS) {
530 531
    return NULL;
  }
532
  return pTableMeta;
wmmhello's avatar
wmmhello 已提交
533
}
wmmhello's avatar
wmmhello 已提交
534

535 536
static int64_t smlGenId() {
  static volatile int64_t linesSmlHandleId = 0;
wmmhello's avatar
wmmhello 已提交
537

538 539 540 541
  int64_t id = 0;
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);
542

543
  return id;
544 545
}

546 547 548 549 550
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
                                       ESchemaAction *action, SSmlHandle *info) {
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
  if (index) {
    if (colField[*index].type != kv->type) {
551
      uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
552
      return TSDB_CODE_SML_INVALID_DATA;
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
    }

    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
      if (isTag) {
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
    }
  } else {
    if (isTag) {
      *action = SCHEMA_ACTION_ADD_TAG;
    } else {
      *action = SCHEMA_ACTION_ADD_COLUMN;
    }
wmmhello's avatar
wmmhello 已提交
571
  }
572
  return 0;
573 574
}

575
#define BOUNDARY 1024
576 577
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
  int32_t result = 1;
578 579 580 581 582 583
  if (length >= BOUNDARY){
    result = length;
  }else{
    while (result <= length) {
      result *= 2;
    }
584
  }
585 586 587 588
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
589
  }
590 591 592 593 594

  if (type == TSDB_DATA_TYPE_NCHAR) {
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_BINARY) {
    result = result + VARSTR_HEADER_SIZE;
595
  }
596
  return result;
597 598
}

599 600 601 602 603 604 605 606 607 608
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                      ESchemaAction *action, bool isTag) {
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    if (j == 0 && !isTag) continue;
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
609
  }
610
  return TSDB_CODE_SUCCESS;
611 612
}

613 614 615 616 617
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  int32_t   i = 0;
  for (; i < length; i++) {
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
618 619
  }

620 621 622 623
  if (isTag) {
    i = 0;
  } else {
    i = 1;
624
  }
625 626 627 628 629 630
  for (; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
      taosHashCleanup(hashTmp);
      return -1;
    }
631
  }
632 633 634
  taosHashCleanup(hashTmp);
  return 0;
}
635

636 637 638 639 640 641
static int32_t getBytes(uint8_t type, int32_t length) {
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
642 643
}

644 645 646 647 648
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
    ESchemaAction action = SCHEMA_ACTION_NULL;
X
Xiaoyu Wang 已提交
649 650
    int           code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
    if (code != 0) {
D
dapan1121 已提交
651 652
      return code;
    }
653 654 655 656 657 658 659 660
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
661
      if (index == NULL) {
D
dapan1121 已提交
662 663 664
        uError("smlBuildFieldsList get error, key:%s", kv->key);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
665
      uint16_t newIndex = *index;
666 667 668
      if (isTag) newIndex -= numOfCols;
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
669 670
    }
  }
671 672 673 674 675 676 677 678

  int32_t maxLen = isTag ? TSDB_MAX_TAGS_LEN : TSDB_MAX_BYTES_PER_ROW;
  int32_t len = 0;
  for (int j = 0; j < taosArrayGetSize(results); ++j) {
    SField *field = taosArrayGet(results, j);
    len += field->bytes;
  }
  if(len > maxLen){
679
    return isTag ? TSDB_CODE_PAR_INVALID_TAGS_LENGTH : TSDB_CODE_PAR_INVALID_ROW_LENGTH;
680 681
  }

682 683 684
  return TSDB_CODE_SUCCESS;
}

685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
  if (code != TSDB_CODE_SUCCESS) {
702 703
    goto end;
  }
704 705 706 707

  pRequest->syncQuery = true;
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
708 709 710
    goto end;
  }

711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }
727

728 729 730 731
  if (pReq.numOfTags == 0) {
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
732
    field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
733 734
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
735
  }
wmmhello's avatar
wmmhello 已提交
736

737 738 739
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);
740

741 742 743 744 745 746 747 748 749
  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
750

751 752 753 754 755 756
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;
757

758
  launchQueryImpl(pRequest, &pQuery, true, NULL);
759

760 761
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    catalogRemoveTableMeta(info->pCatalog, pName);
762
  }
763 764
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);
765

X
Xiaoyu Wang 已提交
766
end:
767 768 769 770
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}
771

772
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
X
Xiaoyu Wang 已提交
773 774
  uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat,
         info->needModifySchema);
X
Xiaoyu Wang 已提交
775
  if (info->dataFormat && !info->needModifySchema) {
776
    return TSDB_CODE_SUCCESS;
777
  }
778 779 780
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
781 782 783 784 785 786 787 788 789

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
790

791
  SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
792
  while (tmp) {
793
    SSmlSTableMeta *sTableData = *tmp;
794
    bool            needCheckMeta = false;  // for multi thread
795

796 797
    size_t superTableLen = 0;
    void  *superTable = taosHashGetKey(tmp, &superTableLen);
wmmhello's avatar
wmmhello 已提交
798 799 800
    char* measure = taosMemoryMalloc(superTableLen);
    memcpy(measure, superTable, superTableLen);
    PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen);
801
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
802 803
    memcpy(pName.tname, measure, superTableLen);
    taosMemoryFree(measure);
804

805
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
806

807
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
D
dapan1121 已提交
808
      uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname);
809 810
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
D
dapan1121 已提交
811 812 813
      code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlBuildFieldsList tag1 failed. %s", info->id, pName.tname);
814 815
        taosArrayDestroy(pColumns);
        taosArrayDestroy(pTags);
D
dapan1121 已提交
816 817 818 819 820
        goto end;
      }
      code = smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlBuildFieldsList col1 failed. %s", info->id, pName.tname);
821 822
        taosArrayDestroy(pColumns);
        taosArrayDestroy(pTags);
D
dapan1121 已提交
823 824
        goto end;
      }
825 826 827 828
      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
        goto end;
829
      }
830 831
      info->cost.numOfCreateSTables++;
      taosMemoryFreeClear(pTableMeta);
832

833 834 835 836
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
837
      }
838 839 840 841 842 843
    } else if (code == TSDB_CODE_SUCCESS) {
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
844
      }
845

846 847 848 849
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
850
      }
851
      if (action != SCHEMA_ACTION_NULL) {
X
Xiaoyu Wang 已提交
852 853
        uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname,
               action);
854 855 856 857
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
858

859 860 861 862 863 864 865 866 867
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
868 869
          }
        }
D
dapan1121 已提交
870
        code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
X
Xiaoyu Wang 已提交
871
                                  pTableMeta->tableInfo.numOfColumns, true);
D
dapan1121 已提交
872 873
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlBuildFieldsList tag2 failed. %s", info->id, pName.tname);
874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
          taosArrayDestroy(pColumns);
          taosArrayDestroy(pTags);
          goto end;
        }

        if (taosArrayGetSize(pTags) + pTableMeta->tableInfo.numOfColumns > TSDB_MAX_COLUMNS) {
          uError("SML:0x%" PRIx64 " too many columns than 4096", info->id);
          code = TSDB_CODE_PAR_TOO_MANY_COLUMNS;
          taosArrayDestroy(pColumns);
          taosArrayDestroy(pTags);
          goto end;
        }
        if (taosArrayGetSize(pTags) > TSDB_MAX_TAGS) {
          uError("SML:0x%" PRIx64 " too many tags than 128", info->id);
          code = TSDB_CODE_PAR_INVALID_TAGS_NUM;
          taosArrayDestroy(pColumns);
          taosArrayDestroy(pTags);
D
dapan1121 已提交
891 892
          goto end;
        }
893

894 895 896 897
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
898
        }
899

900 901 902 903 904 905 906 907 908
        info->cost.numOfAlterTagSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
wmmhello's avatar
wmmhello 已提交
909
        }
910 911
      }

912 913 914
      taosHashClear(hashTmp);
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
915
      }
916 917 918 919
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
920
      }
921
      if (action != SCHEMA_ACTION_NULL) {
X
Xiaoyu Wang 已提交
922 923
        uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname,
               action);
924 925 926 927
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
928

929 930 931 932 933 934 935 936 937 938
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
          }
939 940
        }

D
dapan1121 已提交
941
        code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
X
Xiaoyu Wang 已提交
942
                                  pTableMeta->tableInfo.numOfColumns, false);
D
dapan1121 已提交
943 944
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlBuildFieldsList col2 failed. %s", info->id, pName.tname);
945 946 947 948 949 950 951 952 953 954
          taosArrayDestroy(pColumns);
          taosArrayDestroy(pTags);
          goto end;
        }

        if (taosArrayGetSize(pColumns) + pTableMeta->tableInfo.numOfTags > TSDB_MAX_COLUMNS) {
          uError("SML:0x%" PRIx64 " too many columns than 4096", info->id);
          code = TSDB_CODE_PAR_TOO_MANY_COLUMNS;
          taosArrayDestroy(pColumns);
          taosArrayDestroy(pTags);
D
dapan1121 已提交
955 956
          goto end;
        }
957

958 959 960 961
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
962
        }
963

964 965 966 967 968 969 970 971 972 973
        info->cost.numOfAlterColSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
974 975
        }
      }
wmmhello's avatar
wmmhello 已提交
976

977 978 979
      needCheckMeta = true;
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
X
Xiaoyu Wang 已提交
980
    } else {
981 982
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
      goto end;
wmmhello's avatar
wmmhello 已提交
983
    }
984

985 986 987 988 989 990 991 992 993 994 995
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
                          sTableData->tags, true);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
        goto end;
      }
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
        goto end;
996 997
      }
    }
998 999

    sTableData->tableMeta = pTableMeta;
X
Xiaoyu Wang 已提交
1000 1001
    uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid,
           pTableMeta->sversion, pTableMeta->tversion) tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
wmmhello's avatar
wmmhello 已提交
1002
  }
X
Xiaoyu Wang 已提交
1003 1004
  uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat,
         info->needModifySchema);
D
dapan1121 已提交
1005

1006
  return 0;
1007

X
Xiaoyu Wang 已提交
1008
end:
1009 1010
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
1011
  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
X
Xiaoyu Wang 已提交
1012 1013
  uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code,
         tstrerror(code), info->dataFormat, info->needModifySchema);
D
dapan1121 已提交
1014

1015
  return code;
wmmhello's avatar
wmmhello 已提交
1016 1017
}

1018 1019 1020 1021 1022 1023 1024
/*
static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
1025
    }
wmmhello's avatar
wmmhello 已提交
1026
  }
1027 1028 1029
  return TSDB_CODE_SUCCESS;
}

1030 1031 1032 1033 1034
static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
1035
  }
1036 1037 1038
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
1039
  }
1040

1041 1042 1043
  end:
  taosHashCleanup(dumplicateKey);
  return ret;
wmmhello's avatar
wmmhello 已提交
1044
}
1045
*/
wmmhello's avatar
wmmhello 已提交
1046

X
Xiaoyu Wang 已提交
1047
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
1048 1049
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
1050 1051
    int     ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    if (ret == 0) {
1052 1053
      taosArrayPush(metaArray, kv);
    }
1054
  }
1055
}
wmmhello's avatar
wmmhello 已提交
1056

1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}

static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);

    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
    if (index) {
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
X
Xiaoyu Wang 已提交
1073
      if (isTag) {
1074 1075
        if (kv->length > value->length) {
          value->length = kv->length;
1076
        }
1077 1078 1079 1080 1081
        continue;
      }
      if (kv->type != value->type) {
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
        return TSDB_CODE_SML_NOT_SAME_TYPE;
1082 1083
      }

1084 1085 1086 1087 1088
      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
      }
    } else {
      size_t tmp = taosArrayGetSize(metaArray);
X
Xiaoyu Wang 已提交
1089
      if (tmp > INT16_MAX) {
D
dapan1121 已提交
1090
        smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
1091
        uError("too many cols or tags");
D
dapan1121 已提交
1092
        return TSDB_CODE_SML_INVALID_DATA;
1093
      }
1094
      int16_t size = tmp;
X
Xiaoyu Wang 已提交
1095 1096
      int     ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
      if (ret == 0) {
1097
        taosArrayPush(metaArray, kv);
1098 1099 1100 1101
      }
    }
  }

1102 1103
  return TSDB_CODE_SUCCESS;
}
1104

1105
void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
1106 1107 1108 1109
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
  }
1110

X
Xiaoyu Wang 已提交
1111 1112 1113 1114 1115
  //  if (info->parseJsonByLib) {
  //    SSmlLineInfo *key = (SSmlLineInfo *)(tag->key);
  //    if (key != NULL) taosMemoryFree(key->tags);
  //  }
  //  taosMemoryFree(tag->key);
1116
  taosArrayDestroy(tag->cols);
wmmhello's avatar
wmmhello 已提交
1117
  taosArrayDestroyEx(tag->tags, freeSSmlKv);
1118 1119
  taosMemoryFree(tag);
}
1120

X
Xiaoyu Wang 已提交
1121
void clearColValArray(SArray *pCols) {
wmmhello's avatar
wmmhello 已提交
1122 1123
  int32_t num = taosArrayGetSize(pCols);
  for (int32_t i = 0; i < num; ++i) {
X
Xiaoyu Wang 已提交
1124
    SColVal *pCol = taosArrayGet(pCols, i);
wmmhello's avatar
wmmhello 已提交
1125 1126 1127 1128 1129 1130
    if (TSDB_DATA_TYPE_NCHAR == pCol->type) {
      taosMemoryFreeClear(pCol->value.pData);
    }
  }
}

wmmhello's avatar
wmmhello 已提交
1131 1132 1133 1134 1135 1136
void freeSSmlKv(void* data){
  SSmlKv *kv = (SSmlKv*)data;
  if(kv->keyEscaped) taosMemoryFree((void*)(kv->key));
  if(kv->valueEscaped) taosMemoryFree((void*)(kv->value));
}

wmmhello's avatar
wmmhello 已提交
1137
void smlDestroyInfo(SSmlHandle *info) {
1138 1139
  if (!info) return;
  qDestroyQuery(info->pQuery);
1140

1141
  // destroy info->childTables
1142 1143 1144 1145
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
  while (oneTable) {
    smlDestroyTableInfo(info, *oneTable);
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
1146 1147
  }

1148
  // destroy info->superTables
1149 1150 1151 1152
  SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
  while (oneSTable) {
    smlDestroySTableMeta(*oneSTable);
    oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
1153
  }
1154

1155 1156
  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
1157 1158
  taosHashCleanup(info->childTables);
  taosHashCleanup(info->superTables);
1159
  taosHashCleanup(info->tableUids);
wmmhello's avatar
wmmhello 已提交
1160

X
Xiaoyu Wang 已提交
1161
  for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) {
wmmhello's avatar
wmmhello 已提交
1162 1163 1164 1165 1166
    cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i);
    cJSON_Delete(tags);
  }
  taosArrayDestroy(info->tagJsonArray);

1167 1168 1169 1170 1171 1172
  for (int i = 0; i < taosArrayGetSize(info->valueJsonArray); i++) {
    cJSON *value = (cJSON *)taosArrayGetP(info->valueJsonArray, i);
    cJSON_Delete(value);
  }
  taosArrayDestroy(info->valueJsonArray);

wmmhello's avatar
wmmhello 已提交
1173
  taosArrayDestroyEx(info->preLineTagKV, freeSSmlKv);
wmmhello's avatar
wmmhello 已提交
1174

X
Xiaoyu Wang 已提交
1175 1176
  if (!info->dataFormat) {
    for (int i = 0; i < info->lineNum; i++) {
wmmhello's avatar
wmmhello 已提交
1177
      taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
X
Xiaoyu Wang 已提交
1178
      if (info->parseJsonByLib) {
wmmhello's avatar
wmmhello 已提交
1179 1180
        taosMemoryFree(info->lines[i].tags);
      }
X
Xiaoyu Wang 已提交
1181
      if (info->lines[i].measureTagsLen != 0) taosMemoryFree(info->lines[i].measureTag);
1182 1183
    }
    taosMemoryFree(info->lines);
wmmhello's avatar
wmmhello 已提交
1184
  }
1185

wmmhello's avatar
wmmhello 已提交
1186
  cJSON_Delete(info->root);
1187 1188
  taosMemoryFreeClear(info);
}
wmmhello's avatar
wmmhello 已提交
1189

wmmhello's avatar
wmmhello 已提交
1190
SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
1191 1192 1193 1194
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
1195
  }
X
Xiaoyu Wang 已提交
1196
  if (taos != NULL) {
1197
    info->taos = acquireTscObj(*(int64_t *)taos);
X
Xiaoyu Wang 已提交
1198
    if (info->taos == NULL) {
D
dapan1121 已提交
1199 1200
      goto cleanup;
    }
1201 1202 1203 1204 1205
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
1206
  }
wmmhello's avatar
wmmhello 已提交
1207

1208
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1209
  info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1210
  info->tableUids = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1211 1212
  info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);

1213 1214 1215
  info->id = smlGenId();
  info->pQuery = smlInitHandle();
  info->dataFormat = true;
1216

wmmhello's avatar
wmmhello 已提交
1217
  info->tagJsonArray = taosArrayInit(8, POINTER_BYTES);
1218
  info->valueJsonArray = taosArrayInit(8, POINTER_BYTES);
1219 1220
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));

1221
  if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables || NULL == info->tableUids) {
1222 1223
    uError("create SSmlHandle failed");
    goto cleanup;
1224 1225
  }

1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240
  return info;

cleanup:
  smlDestroyInfo(info);
  return NULL;
}

static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (!kvHash) {
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
1241
    terrno = 0;
1242
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
X
Xiaoyu Wang 已提交
1243
    if (terrno == TSDB_CODE_DUP_KEY) {
1244
      taosHashCleanup(kvHash);
X
Xiaoyu Wang 已提交
1245 1246
      return terrno;
    }
1247 1248
  }

1249
  taosArrayPush(colsArray, &kvHash);
1250
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1251
}
1252

1253
static int32_t smlParseLineBottom(SSmlHandle *info) {
X
Xiaoyu Wang 已提交
1254 1255
  uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat,
         info->lineNum);
X
Xiaoyu Wang 已提交
1256
  if (info->dataFormat) return TSDB_CODE_SUCCESS;
1257

X
Xiaoyu Wang 已提交
1258 1259
  for (int32_t i = 0; i < info->lineNum; i++) {
    SSmlLineInfo  *elements = info->lines + i;
1260
    SSmlTableInfo *tinfo = NULL;
X
Xiaoyu Wang 已提交
1261
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
X
Xiaoyu Wang 已提交
1262 1263 1264
      SSmlTableInfo **tmp =
          (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
      if (tmp) tinfo = *tmp;
X
Xiaoyu Wang 已提交
1265
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
X
Xiaoyu Wang 已提交
1266
      SSmlTableInfo **tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
wmmhello's avatar
wmmhello 已提交
1267
                                                          elements->measureLen + elements->tagsLen);
X
Xiaoyu Wang 已提交
1268
      if (tmp) tinfo = *tmp;
X
Xiaoyu Wang 已提交
1269
    } else {
X
Xiaoyu Wang 已提交
1270
      SSmlTableInfo **tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
wmmhello's avatar
wmmhello 已提交
1271
                                                          elements->measureLen + elements->tagsLen);
X
Xiaoyu Wang 已提交
1272
      if (tmp) tinfo = *tmp;
1273 1274
    }

X
Xiaoyu Wang 已提交
1275
    if (tinfo == NULL) {
1276 1277 1278
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
1279
    }
wmmhello's avatar
wmmhello 已提交
1280

1281 1282 1283 1284 1285
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
    }

1286 1287 1288
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
1289
    }
wmmhello's avatar
wmmhello 已提交
1290

1291
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
1292
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1293 1294 1295
      return ret;
    }

1296 1297
    SSmlSTableMeta **tableMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
1298
    if (tableMeta) {  // update meta
X
Xiaoyu Wang 已提交
1299 1300
      uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat,
             info->lineNum);
1301
      ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
1302
      if (ret == TSDB_CODE_SUCCESS) {
1303
        ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
1304 1305 1306 1307 1308
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
1309
    } else {
X
Xiaoyu Wang 已提交
1310 1311 1312 1313 1314
      //      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
      //      if (ret != TSDB_CODE_SUCCESS) {
      //        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
      //        return ret;
      //      }
X
Xiaoyu Wang 已提交
1315 1316
      uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat,
             info->lineNum);
1317
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
1318
      taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
1319
      terrno = 0;
1320
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
X
Xiaoyu Wang 已提交
1321 1322 1323
      if (terrno == TSDB_CODE_DUP_KEY) {
        return terrno;
      }
1324
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
wmmhello's avatar
wmmhello 已提交
1325
    }
wmmhello's avatar
wmmhello 已提交
1326
  }
D
dapan1121 已提交
1327
  uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
1328

wmmhello's avatar
wmmhello 已提交
1329 1330 1331
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1332
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
1333
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1334
  uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
wmmhello's avatar
wmmhello 已提交
1335

X
Xiaoyu Wang 已提交
1336
  if (info->pRequest->dbList == NULL) {
wmmhello's avatar
wmmhello 已提交
1337 1338
    info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
  }
1339 1340 1341 1342
  char *data = (char*)taosArrayReserve(info->pRequest->dbList, 1);
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
  tNameGetFullDbName(&pName, data);
wmmhello's avatar
wmmhello 已提交
1343

1344 1345 1346
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
  while (oneTable) {
    SSmlTableInfo *tableData = *oneTable;
1347
    tstrncpy(pName.tname, tableData->sTableName, tableData->sTableNameLen + 1);
D
dapan1121 已提交
1348

X
Xiaoyu Wang 已提交
1349
    if (info->pRequest->tableList == NULL) {
wmmhello's avatar
wmmhello 已提交
1350 1351 1352 1353
      info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
    }
    taosArrayPush(info->pRequest->tableList, &pName);

1354 1355
    tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1);

D
dapan1121 已提交
1356 1357 1358 1359 1360
    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
1361

wmmhello's avatar
wmmhello 已提交
1362
    SVgroupInfo vg;
D
dapan1121 已提交
1363
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
1364
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1365
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
1366 1367
      return code;
    }
X
Xiaoyu Wang 已提交
1368
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
1369

1370 1371 1372
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    if (unlikely(NULL == pMeta || NULL == (*pMeta)->tableMeta)) {
1373 1374 1375
      uError("SML:0x%" PRIx64 " NULL == pMeta. table name: %s", info->id, tableData->childTableName);
      return TSDB_CODE_SML_INTERNAL_ERROR;
    }
wmmhello's avatar
wmmhello 已提交
1376

1377
    // use tablemeta of stable to save vgid and uid of child table
1378 1379
    (*pMeta)->tableMeta->vgId = vg.vgId;
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
X
Xiaoyu Wang 已提交
1380 1381
    uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname,
           tableData->uid, info->dataFormat);
wmmhello's avatar
wmmhello 已提交
1382

1383 1384 1385 1386 1387
    int   measureLen = tableData->sTableNameLen;
    char* measure = (char*)taosMemoryMalloc(tableData->sTableNameLen);
    memcpy(measure, tableData->sTableName, tableData->sTableNameLen);
    PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);

X
Xiaoyu Wang 已提交
1388
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
1389
                       (*pMeta)->tableMeta, tableData->childTableName, measure, measureLen,
1390
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
1391
    taosMemoryFree(measure);
X
Xiaoyu Wang 已提交
1392 1393
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
1394 1395
      return code;
    }
1396
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
1397
  }
wmmhello's avatar
wmmhello 已提交
1398

wmmhello's avatar
wmmhello 已提交
1399
  code = smlBuildOutput(info->pQuery, info->pVgHash);
1400
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1401
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
1402 1403
    return code;
  }
1404 1405
  info->cost.insertRpcTime = taosGetTimestampUs();

1406 1407 1408
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

wmmhello's avatar
wmmhello 已提交
1409
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
X
Xiaoyu Wang 已提交
1410 1411
  uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code,
         tstrerror(info->pRequest->code));
D
dapan1121 已提交
1412

wmmhello's avatar
wmmhello 已提交
1413
  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
1414 1415
}

X
Xiaoyu Wang 已提交
1416
static void smlPrintStatisticInfo(SSmlHandle *info) {
1417
  uDebug(
X
Xiaoyu Wang 已提交
1418
      "SML:0x%" PRIx64
D
dapan1121 已提交
1419
      " smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
1420
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
X
Xiaoyu Wang 已提交
1421
      "",
X
Xiaoyu Wang 已提交
1422 1423 1424 1425 1426
      info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables,
      info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables,
      info->cost.numOfAlterColSTables, info->cost.schemaTime - info->cost.parseTime,
      info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
      info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
1427 1428
}

X
Xiaoyu Wang 已提交
1429
int32_t smlClearForRerun(SSmlHandle *info) {
1430 1431
  info->reRun = false;
  // clear info->childTables
1432 1433 1434 1435
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
  while (oneTable) {
    smlDestroyTableInfo(info, *oneTable);
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
1436 1437 1438
  }

  // clear info->superTables
1439 1440 1441 1442
  SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
  while (oneSTable) {
    smlDestroySTableMeta(*oneSTable);
    oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
1443 1444
  }

wmmhello's avatar
wmmhello 已提交
1445 1446
  taosHashClear(info->childTables);
  taosHashClear(info->superTables);
1447
  taosHashClear(info->tableUids);
wmmhello's avatar
wmmhello 已提交
1448

X
Xiaoyu Wang 已提交
1449
  if (!info->dataFormat) {
wmmhello's avatar
wmmhello 已提交
1450 1451 1452 1453 1454
    if (unlikely(info->lines != NULL)) {
      uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
      return TSDB_CODE_SML_INVALID_DATA;
    }
    info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
1455 1456 1457 1458 1459 1460
  }

  memset(&info->preLine, 0, sizeof(SSmlLineInfo));
  info->currSTableMeta = NULL;
  info->currTableDataCtx = NULL;

X
Xiaoyu Wang 已提交
1461
  SVnodeModifyOpStmt *stmt = (SVnodeModifyOpStmt *)(info->pQuery->pRoot);
1462 1463 1464 1465 1466
  stmt->freeHashFunc(stmt->pTableBlockHashObj);
  stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1467
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
D
dapan1121 已提交
1468
  uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id);
wmmhello's avatar
wmmhello 已提交
1469
  int32_t code = TSDB_CODE_SUCCESS;
1470
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
1471
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1472
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
1473
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1474 1475
      code = smlParseJSON(info, rawLine);
    }
1476
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
1477
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
1478 1479
      return code;
    }
wmmhello's avatar
wmmhello 已提交
1480
    return code;
wmmhello's avatar
wmmhello 已提交
1481
  }
wmmhello's avatar
wmmhello 已提交
1482

X
Xiaoyu Wang 已提交
1483
  char   *oldRaw = rawLine;
1484 1485
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
1486
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
1487 1488
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1489 1490
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
1491
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1492
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
1493 1494
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
1495 1496 1497 1498
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
1499
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
wmmhello's avatar
wmmhello 已提交
1500 1501
        continue;
      }
wmmhello's avatar
wmmhello 已提交
1502 1503
    }

X
Xiaoyu Wang 已提交
1504 1505
    char cTmp = 0;  // for print tmp if is raw
    if (info->isRawLine) {
wmmhello's avatar
wmmhello 已提交
1506 1507
      cTmp = tmp[len];
      tmp[len] = '\0';
D
dapan1121 已提交
1508 1509
    }

X
Xiaoyu Wang 已提交
1510 1511 1512
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id,
           info->isRawLine, numLines, info->protocol, len, tmp);
    if (info->isRawLine) {
wmmhello's avatar
wmmhello 已提交
1513
      tmp[len] = cTmp;
D
dapan1121 已提交
1514
    }
1515

X
Xiaoyu Wang 已提交
1516
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
X
Xiaoyu Wang 已提交
1517
      if (info->dataFormat) {
wmmhello's avatar
wmmhello 已提交
1518 1519
        SSmlLineInfo element = {0};
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
X
Xiaoyu Wang 已提交
1520
      } else {
wmmhello's avatar
wmmhello 已提交
1521 1522
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
1523
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
X
Xiaoyu Wang 已提交
1524
      if (info->dataFormat) {
1525 1526
        SSmlLineInfo element = {0};
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
X
Xiaoyu Wang 已提交
1527
        if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
X
Xiaoyu Wang 已提交
1528
      } else {
1529 1530
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
1531
    } else {
1532
      code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
1533
    }
wmmhello's avatar
wmmhello 已提交
1534
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1535
      tmp[len] = '\0';
wmmhello's avatar
wmmhello 已提交
1536
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
1537
      return code;
wmmhello's avatar
wmmhello 已提交
1538
    }
X
Xiaoyu Wang 已提交
1539
    if (info->reRun) {
D
dapan1121 已提交
1540
      uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id);
1541
      i = 0;
1542
      rawLine = oldRaw;
1543
      code = smlClearForRerun(info);
X
Xiaoyu Wang 已提交
1544
      if (code != TSDB_CODE_SUCCESS) {
1545
        return code;
1546
      }
1547
      continue;
1548
    }
1549
    i++;
wmmhello's avatar
wmmhello 已提交
1550
  }
D
dapan1121 已提交
1551
  uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id);
1552

1553 1554 1555
  return code;
}

dengyihao's avatar
dengyihao 已提交
1556
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
1557
  int32_t code = TSDB_CODE_SUCCESS;
1558 1559
  int32_t retryNum = 0;

1560 1561
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
1562
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
1563
  if (code != 0) {
X
Xiaoyu Wang 已提交
1564
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1565
    return code;
1566
  }
1567 1568 1569 1570 1571 1572
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

1573
  info->cost.lineNum = info->lineNum;
1574 1575
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);
1576 1577

  info->cost.schemaTime = taosGetTimestampUs();
1578

X
Xiaoyu Wang 已提交
1579
  do {
1580
    code = smlModifyDBSchemas(info);
1581
    if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
1582
        || code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
1583 1584 1585
        || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
      break;
    }
1586
    taosMsleep(100);
D
dapan1121 已提交
1587
    uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
1588
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
1589

wmmhello's avatar
wmmhello 已提交
1590
  if (code != 0) {
X
Xiaoyu Wang 已提交
1591
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1592
    return code;
wmmhello's avatar
wmmhello 已提交
1593
  }
wmmhello's avatar
wmmhello 已提交
1594

1595
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
1596 1597
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
1598
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1599
    return code;
wmmhello's avatar
wmmhello 已提交
1600 1601 1602 1603 1604
  }

  return code;
}

X
Xiaoyu Wang 已提交
1605 1606
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines,
                                       int protocol, int precision, int32_t ttl, int64_t reqid) {
1607
  int32_t code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1608 1609 1610
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
1611
  }
wmmhello's avatar
wmmhello 已提交
1612
  SRequestObj *request = NULL;
X
Xiaoyu Wang 已提交
1613 1614 1615
  SSmlHandle  *info = NULL;
  int          cnt = 0;
  while (1) {
wmmhello's avatar
wmmhello 已提交
1616 1617 1618 1619 1620
    request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
    if (request == NULL) {
      uError("SML:taos_schemaless_insert error request is null");
      return NULL;
    }
1621

wmmhello's avatar
wmmhello 已提交
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
    info = smlBuildSmlInfo(taos);
    if (info == NULL) {
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      return (TAOS_RES *)request;
    }
    info->pRequest = request;
    info->isRawLine = rawLine != NULL;
    info->ttl = ttl;
    info->precision = precision;
    info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
    info->lineNum = numLines;

    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    if (request->pDb == NULL) {
      request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
      smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
      goto end;
    }
1643

wmmhello's avatar
wmmhello 已提交
1644 1645 1646 1647 1648
    if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
      request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
      smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
1649

wmmhello's avatar
wmmhello 已提交
1650 1651 1652 1653 1654 1655
    if (protocol == TSDB_SML_LINE_PROTOCOL &&
        (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
      request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
      smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
1656

wmmhello's avatar
wmmhello 已提交
1657 1658 1659 1660 1661 1662 1663
    if (protocol == TSDB_SML_JSON_PROTOCOL) {
      numLines = 1;
    } else if (numLines <= 0) {
      request->code = TSDB_CODE_SML_INVALID_DATA;
      smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
      goto end;
    }
1664

wmmhello's avatar
wmmhello 已提交
1665 1666 1667 1668
    code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
    request->code = code;
    info->cost.endTime = taosGetTimestampUs();
    info->cost.code = code;
X
Xiaoyu Wang 已提交
1669
    if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING ||
1670 1671
        code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT ||
        code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
X
Xiaoyu Wang 已提交
1672 1673
      if (cnt++ >= 10) {
        uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
D
dapan1121 已提交
1674 1675 1676
        break;
      }
      taosMsleep(100);
wmmhello's avatar
wmmhello 已提交
1677
      refreshMeta(request->pTscObj, request);
X
Xiaoyu Wang 已提交
1678 1679
      uInfo("SML:%" PRIx64 " retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code,
            tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1680 1681 1682 1683 1684 1685
      smlDestroyInfo(info);
      info = NULL;
      taos_free_result(request);
      request = NULL;
      continue;
    }
D
dapan1121 已提交
1686
    smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
1687
    break;
wmmhello's avatar
wmmhello 已提交
1688 1689
  }

wmmhello's avatar
wmmhello 已提交
1690
end:
wmmhello's avatar
wmmhello 已提交
1691
  smlDestroyInfo(info);
1692
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
1693
}
wmmhello's avatar
wmmhello 已提交
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
1711
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
1712 1713
 */

1714 1715
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
1716
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1717 1718
}

1719 1720 1721
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
1722

X
Xiaoyu Wang 已提交
1723 1724
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                     int32_t ttl) {
1725 1726
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1727

X
Xiaoyu Wang 已提交
1728 1729 1730 1731
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                            int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL,
                                               reqid);
wmmhello's avatar
wmmhello 已提交
1732 1733
}

1734
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
1735
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747
  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
wmmhello's avatar
wmmhello 已提交
1748
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1749 1750
}

X
Xiaoyu Wang 已提交
1751 1752 1753 1754
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
                                                int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision,
                                                   TSDB_DEFAULT_TABLE_TTL, reqid);
1755
}
X
Xiaoyu Wang 已提交
1756 1757
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
                                         int precision, int32_t ttl) {
1758 1759
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1760

X
Xiaoyu Wang 已提交
1761 1762 1763 1764
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
                                     int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision,
                                                   TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
1765
}